You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "xichen01 (via GitHub)" <gi...@apache.org> on 2023/09/25 13:57:46 UTC

[GitHub] [ozone] xichen01 opened a new pull request, #5358: HDDS-9348. Improving parsing speed of DB Scanner

xichen01 opened a new pull request, #5358:
URL: https://github.com/apache/ozone/pull/5358

   ## What changes were proposed in this pull request?
   Improving parsing speed of DB Scanner
   
   ### Test
   ```bash
    ozone debug ldb --db=/tmp/metadata/om.db/ scan --column_family=keyTable -l 1000000 -o /dev/null
   ```
   Before:
   Take about 300s
   After:
   Take about 5s
   
   Performance improvement of about 50 ~ 60 times
   
   ### Optimization method
   - Multi-threaded Json parsing, separate thread to read RocksDB, separate thread for output
   - Replacing the Json parsing tool from Gson to Jackson
   
   ## What is the link to the Apache JIRA
   https://issues.apache.org/jira/browse/HDDS-9348
   
   ## How was this patch tested?
   Existing Test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1347691296


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();

Review Comment:
   Yes, the `System.out` should not be closed, have been modified



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        threadCount, threadCount, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdownNow();
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                              DBColumnFamilyDefinition dbColumnFamilyDef,
+                              LogWriter logWriter, ExecutorService threadPool,
+                              boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= batchSize) {
+        while (logWriter.getInflightLogCount() > threadCount * 10L

Review Comment:
   There are two places here that will control the queue length.
   1. The `Task` queue is will be blocked by Threadpool's `LinkedBlockingQueue`.
   2. This `getInflightLogCount()` is used to control the number of `WriterTask`.
   
   It is also possible to use `InflightLogCount` for full link control, so we need to increase `InflightLogCount` here. and decrease `InflightLogCount` after `printWriter.println()`, how do you think?



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +410,253 @@ private String removeTrailingSlashIfNeeded(String dbPath) {
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (!(sequenceId == FIRST_SEQUENCE_ID && results.isEmpty())) {
+            // Add a comma before each output entry, starting from the second
+            // one, to ensure valid JSON format.
+            sb.append(", ");
+          }
+          if (withKey) {
+            Object key = dbColumnFamilyDefinition.getKeyCodec()
+                .fromPersistedFormat(byteArrayKeyValue.getKey());
+            if (schemaV3) {
+              int index =
+                  DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
+              String keyStr = key.toString();
+              if (index > keyStr.length()) {
+                err().println("Error: Invalid SchemaV3 table key length. "
+                    + "Is this a V2 table? Try again with --dn-schema=V2");
+                exception = true;
+                break;
+              }
+              String cid = key.toString().substring(0, index);
+              String blockId = key.toString().substring(index);
+              sb.append(WRITER.writeValueAsString(LongCodec.get()
+                  .fromPersistedFormat(
+                      FixedLengthStringCodec.string2Bytes(cid)) +
+                  KEY_SEPARATOR_SCHEMA_V3 + blockId));
+            } else {
+              sb.append(WRITER.writeValueAsString(key));
+            }
+            sb.append(": ");
+          }
+
+          Object o = dbColumnFamilyDefinition.getValueCodec()
+              .fromPersistedFormat(byteArrayKeyValue.getValue());
+          sb.append(WRITER.writeValueAsString(o));
+          results.add(sb.toString());
+        }
+        logWriter.log(results, sequenceId);
+      } catch (Exception e) {
+        exception = true;
+        LOG.error("Exception parse Object", e);
+      }
+      return null;
+    }
+  }
+
+  private static class ByteArrayKeyValue {
+    private final byte[] key;
+    private final byte[] value;
+
+    ByteArrayKeyValue(byte[] key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public byte[] getKey() {
+      return key;
+    }
+
+    public byte[] getValue() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ByteArrayKeyValue that = (ByteArrayKeyValue) o;
+      return Arrays.equals(key, that.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(key);
+    }
+
+    @Override
+    public String toString() {
+      return "ByteArrayKeyValue{" +
+          "key=" + Arrays.toString(key) +
+          ", value=" + Arrays.toString(value) +
+          '}';
+    }
+  }
+
+  private static class LogWriter {
+    private final Map<Long, ArrayList<String>> logs;
+    private final PrintWriter printWriter;
+    private final Thread writerThread;
+    private volatile boolean stop = false;
+    private long expectedSequenceId = FIRST_SEQUENCE_ID;
+    private final Object lock = new Object();
+    private final AtomicLong inflightLogCount = new AtomicLong();
+
+    LogWriter(PrintWriter printWriter) {
+      this.logs = new HashMap<>();
+      this.printWriter = printWriter;
+      this.writerThread = new Thread(new WriterTask());
+    }
+
+    void start() {
+      writerThread.start();
+    }
+
+    public void log(ArrayList<String> msg, long sequenceId) {
+      synchronized (lock) {
+        if (!stop) {
+          logs.put(sequenceId, msg);
+          inflightLogCount.incrementAndGet();
+          lock.notify();
+        }
+      }
+    }
+
+    private final class WriterTask implements Runnable {
+      public void run() {
+        try {
+          while (!stop) {
+            synchronized (lock) {
+              // The sequenceId is incrementally generated as the RocksDB
+              // iterator. Thus, based on the sequenceId, we can strictly ensure
+              // that the output order here is consistent with the order of the
+              // RocksDB iterator.
+              // Note that the order here not only requires the sequenceId to be
+              // incremental, but also demands that the sequenceId of the
+              // next output is the current sequenceId + 1.
+              ArrayList<String> results = logs.get(expectedSequenceId);
+              if (results != null) {
+                for (String result : results) {
+                  printWriter.println(result);
+                }
+                inflightLogCount.decrementAndGet();
+                logs.remove(expectedSequenceId);
+                // sequenceId of the next output must be the current
+                // sequenceId + 1
+                expectedSequenceId++;
+              } else {
+                lock.wait(1000);
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.error("Exception while output", e);
+        } finally {
+          stop = true;
+          drainRemainingMessages();

Review Comment:
   Yes, This is a missing sync, I have add a lock for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1350320402


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        threadCount, threadCount, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdownNow();
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                              DBColumnFamilyDefinition dbColumnFamilyDef,
+                              LogWriter logWriter, ExecutorService threadPool,
+                              boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= batchSize) {
+        while (logWriter.getInflightLogCount() > threadCount * 10L

Review Comment:
   ok, so here, we are controlling inflight queue as not present.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1343932443


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -114,9 +131,18 @@ public class DBScanner implements Callable<Void>, SubcommandWithParent {
       showDefaultValue = CommandLine.Help.Visibility.ALWAYS)
   private boolean showCount;
 
-  private String keySeparatorSchemaV3 =
+  @CommandLine.Option(names = {"--compact"},
+      description = "disable the pretty print the output",
+      defaultValue = "false")
+  private static boolean compact;
+
+  private static final String KEY_SEPARATOR_SCHEMA_V3 =
       new OzoneConfiguration().getObject(DatanodeConfiguration.class)
           .getContainerSchemaV3KeySeparator();
+  private static final int BATCH_SIZE = 10000;
+  private static final int THREAD_COUNT = 10;

Review Comment:
   Yes, we can make these parameters configurable, but I didn't do this because I found that after the `THREAD_COUNT`
   (number of threads) is greater than 4, the parameters have less of an impact on performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "kerneltime (via GitHub)" <gi...@apache.org>.
kerneltime commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1342891134


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -114,9 +131,18 @@ public class DBScanner implements Callable<Void>, SubcommandWithParent {
       showDefaultValue = CommandLine.Help.Visibility.ALWAYS)
   private boolean showCount;
 
-  private String keySeparatorSchemaV3 =
+  @CommandLine.Option(names = {"--compact"},
+      description = "disable the pretty print the output",
+      defaultValue = "false")
+  private static boolean compact;
+
+  private static final String KEY_SEPARATOR_SCHEMA_V3 =
       new OzoneConfiguration().getObject(DatanodeConfiguration.class)
           .getContainerSchemaV3KeySeparator();
+  private static final int BATCH_SIZE = 10000;
+  private static final int THREAD_COUNT = 10;

Review Comment:
   Why not make these optional params as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sumitagrawl commented on a diff in pull request #5358: HDDS-9348. Improving parsing speed of DB Scanner

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1336727694


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(

Review Comment:
   threadPool construction can be moved inside the method where its used and destroyed.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                                 DBColumnFamilyDefinition dbColumnFamilyDef,
+                                 LogWriter logWriter,
+                                 ExecutorService threadPool,
+                                 boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(BATCH_SIZE);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= BATCH_SIZE) {
+        //
+        while (logWriter.getInflightLogCount() > THREAD_COUNT * 10

Review Comment:
   This check may not prevent memory for cases,
   - Task added to threadpool will be in queue or in execution, and that is not considered
   It should considered both in Writer and ThreadPool Task



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();

Review Comment:
   This should be done before "Thread.currentThread().interrupt() as this may throw further exception. Or Thread.currentThread().interrupt() must be last statement in finally



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {

Review Comment:
   IMO, we can have immediate shutdown and logwriter shutdown. If need wait for complete tasks, can do future get with timout.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +416,252 @@ private String removeTrailingSlashIfNeeded(String dbPath) {
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (sequenceId == FIRST_SEQUENCE_ID && !results.isEmpty()) {

Review Comment:
   comment and if condition do not match, if First sequence, no need append comma but check seems reverse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "kerneltime (via GitHub)" <gi...@apache.org>.
kerneltime commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1342892731


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -114,9 +131,18 @@ public class DBScanner implements Callable<Void>, SubcommandWithParent {
       showDefaultValue = CommandLine.Help.Visibility.ALWAYS)
   private boolean showCount;
 
-  private String keySeparatorSchemaV3 =
+  @CommandLine.Option(names = {"--compact"},
+      description = "disable the pretty print the output",
+      defaultValue = "false")
+  private static boolean compact;
+
+  private static final String KEY_SEPARATOR_SCHEMA_V3 =
       new OzoneConfiguration().getObject(DatanodeConfiguration.class)
           .getContainerSchemaV3KeySeparator();
+  private static final int BATCH_SIZE = 10000;
+  private static final int THREAD_COUNT = 10;

Review Comment:
   Since this change aims to speed up the scan, the thread count and batch size choices can be params.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1351354457


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,102 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    exception = false;
+    if (printWriter == null) {

Review Comment:
   Done;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] xichen01 commented on a diff in pull request #5358: HDDS-9348. Improving parsing speed of DB Scanner

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1337139269


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +416,252 @@ private String removeTrailingSlashIfNeeded(String dbPath) {
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (sequenceId == FIRST_SEQUENCE_ID && !results.isEmpty()) {

Review Comment:
   This have been fixed.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(

Review Comment:
   Done.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {

Review Comment:
   Change to directly shutdown (`threadPool.shutdownNow`).



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();

Review Comment:
   Done.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                                 DBColumnFamilyDefinition dbColumnFamilyDef,
+                                 LogWriter logWriter,
+                                 ExecutorService threadPool,
+                                 boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(BATCH_SIZE);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= BATCH_SIZE) {
+        //
+        while (logWriter.getInflightLogCount() > THREAD_COUNT * 10

Review Comment:
   Added boundary to the queue of the threadpool. 
   - `LinkedBlockingQueue` in the threadpool is used to prevent too many tasks from accumulating in the threadpool. 
   - `getInflightLogCount()` is used to prevent `LogWriter` from having too many tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl merged PR #5358:
URL: https://github.com/apache/ozone/pull/5358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1347479100


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();

Review Comment:
   System.out can be closed? as out() has System.out used internally. I think it should not be closed.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +410,253 @@ private String removeTrailingSlashIfNeeded(String dbPath) {
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (!(sequenceId == FIRST_SEQUENCE_ID && results.isEmpty())) {
+            // Add a comma before each output entry, starting from the second
+            // one, to ensure valid JSON format.
+            sb.append(", ");
+          }
+          if (withKey) {
+            Object key = dbColumnFamilyDefinition.getKeyCodec()
+                .fromPersistedFormat(byteArrayKeyValue.getKey());
+            if (schemaV3) {
+              int index =
+                  DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
+              String keyStr = key.toString();
+              if (index > keyStr.length()) {
+                err().println("Error: Invalid SchemaV3 table key length. "
+                    + "Is this a V2 table? Try again with --dn-schema=V2");
+                exception = true;
+                break;
+              }
+              String cid = key.toString().substring(0, index);
+              String blockId = key.toString().substring(index);
+              sb.append(WRITER.writeValueAsString(LongCodec.get()
+                  .fromPersistedFormat(
+                      FixedLengthStringCodec.string2Bytes(cid)) +
+                  KEY_SEPARATOR_SCHEMA_V3 + blockId));
+            } else {
+              sb.append(WRITER.writeValueAsString(key));
+            }
+            sb.append(": ");
+          }
+
+          Object o = dbColumnFamilyDefinition.getValueCodec()
+              .fromPersistedFormat(byteArrayKeyValue.getValue());
+          sb.append(WRITER.writeValueAsString(o));
+          results.add(sb.toString());
+        }
+        logWriter.log(results, sequenceId);
+      } catch (Exception e) {
+        exception = true;
+        LOG.error("Exception parse Object", e);
+      }
+      return null;
+    }
+  }
+
+  private static class ByteArrayKeyValue {
+    private final byte[] key;
+    private final byte[] value;
+
+    ByteArrayKeyValue(byte[] key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public byte[] getKey() {
+      return key;
+    }
+
+    public byte[] getValue() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ByteArrayKeyValue that = (ByteArrayKeyValue) o;
+      return Arrays.equals(key, that.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(key);
+    }
+
+    @Override
+    public String toString() {
+      return "ByteArrayKeyValue{" +
+          "key=" + Arrays.toString(key) +
+          ", value=" + Arrays.toString(value) +
+          '}';
+    }
+  }
+
+  private static class LogWriter {
+    private final Map<Long, ArrayList<String>> logs;
+    private final PrintWriter printWriter;
+    private final Thread writerThread;
+    private volatile boolean stop = false;
+    private long expectedSequenceId = FIRST_SEQUENCE_ID;
+    private final Object lock = new Object();
+    private final AtomicLong inflightLogCount = new AtomicLong();
+
+    LogWriter(PrintWriter printWriter) {
+      this.logs = new HashMap<>();
+      this.printWriter = printWriter;
+      this.writerThread = new Thread(new WriterTask());
+    }
+
+    void start() {
+      writerThread.start();
+    }
+
+    public void log(ArrayList<String> msg, long sequenceId) {
+      synchronized (lock) {
+        if (!stop) {
+          logs.put(sequenceId, msg);
+          inflightLogCount.incrementAndGet();
+          lock.notify();
+        }
+      }
+    }
+
+    private final class WriterTask implements Runnable {
+      public void run() {
+        try {
+          while (!stop) {
+            synchronized (lock) {
+              // The sequenceId is incrementally generated as the RocksDB
+              // iterator. Thus, based on the sequenceId, we can strictly ensure
+              // that the output order here is consistent with the order of the
+              // RocksDB iterator.
+              // Note that the order here not only requires the sequenceId to be
+              // incremental, but also demands that the sequenceId of the
+              // next output is the current sequenceId + 1.
+              ArrayList<String> results = logs.get(expectedSequenceId);
+              if (results != null) {
+                for (String result : results) {
+                  printWriter.println(result);
+                }
+                inflightLogCount.decrementAndGet();
+                logs.remove(expectedSequenceId);
+                // sequenceId of the next output must be the current
+                // sequenceId + 1
+                expectedSequenceId++;
+              } else {
+                lock.wait(1000);
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.error("Exception while output", e);
+        } finally {
+          stop = true;
+          drainRemainingMessages();

Review Comment:
   this needs execute in lock as logs map is not threadsafe



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        threadCount, threadCount, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdownNow();
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                              DBColumnFamilyDefinition dbColumnFamilyDef,
+                              LogWriter logWriter, ExecutorService threadPool,
+                              boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= batchSize) {
+        while (logWriter.getInflightLogCount() > threadCount * 10L

Review Comment:
   This will not provide proper check for task in progress, as we have two queues, 1. Task, 2. WriterTask, but here, we check only logWriter queue only,
   IMO, we should increase inFlight count when task added here, and decrease when log is written in writerTask as currently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9348. Improving parsing speed of DB Scanner [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1350321976


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,102 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    exception = false;
+    if (printWriter == null) {

Review Comment:
   nit: instead of passing null value, at call only we can call in else case as,
   ```
   return displayTable(iterator, dbColumnFamilyDef, out(),
             schemaV3);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org