You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/06/15 06:57:38 UTC

[incubator-inlong] branch master updated: [INLONG-638] Issues About Disk Error recovery

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a83dc82  [INLONG-638] Issues About Disk Error recovery
a83dc82 is described below

commit a83dc82513bac50cfebc7cb02783213e8617ecd8
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Jun 15 14:38:59 2021 +0800

    [INLONG-638] Issues About Disk Error recovery
---
 .../tubemq/server/broker/msgstore/disk/FileSegment.java     | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
index 949d149..d85c651 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.utils.CheckSum;
+import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,6 +113,9 @@ public class FileSegment implements Segment {
                 this.cachedSize.set(this.channel.size());
                 this.flushedSize.set(this.cachedSize.get());
             } catch (final Exception e) {
+                if (e instanceof IOException) {
+                    ServiceStatusHolder.addReadIOErrCnt();
+                }
                 if (this.segmentType == SegmentType.DATA) {
                     logger.error("[File Store] Set DATA Segment cachedSize error", e);
                 } else {
@@ -133,6 +137,9 @@ public class FileSegment implements Segment {
                 }
                 this.randFile.close();
             } catch (Throwable ee) {
+                if (ee instanceof IOException) {
+                    ServiceStatusHolder.addReadIOErrCnt();
+                }
                 logger.error(new StringBuilder(512).append("[File Store] Close ")
                         .append(this.file.getAbsoluteFile().toString())
                         .append("'s ").append(segmentType).append(" file failure").toString(), ee);
@@ -152,6 +159,9 @@ public class FileSegment implements Segment {
             }
             this.randFile.close();
         } catch (Throwable e1) {
+            if (e1 instanceof IOException) {
+                ServiceStatusHolder.addReadIOErrCnt();
+            }
             logger.error("[File Store] failure to close channel ", e1);
         }
         try {
@@ -160,6 +170,9 @@ public class FileSegment implements Segment {
                     .append(file.getAbsoluteFile()).toString());
             this.file.delete();
         } catch (Throwable ee) {
+            if (ee instanceof IOException) {
+                ServiceStatusHolder.addReadIOErrCnt();
+            }
             logger.error("[File Store] failure to delete file ", ee);
         }
     }