You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "翟玉勇 (JIRA)" <ji...@apache.org> on 2018/08/09 09:41:00 UTC

[jira] [Updated] (FLUME-3266) the parameters of fsyncPerTransaction are not working

     [ https://issues.apache.org/jira/browse/FLUME-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

翟玉勇 updated FLUME-3266:
-----------------------
    Description: 
every take or put transaction has sync disk?
{code}
this.fsyncPerTransaction = fsyncPerTransaction;
      this.fsyncInterval = fsyncInterval;
      if (!fsyncPerTransaction) {
        LOG.info("Sync interval = " + fsyncInterval);
        syncExecutor = Executors.newSingleThreadScheduledExecutor();
        syncExecutor.scheduleWithFixedDelay(new Runnable() {
          @Override
          public void run() {
            try {
              sync("Thread-other");
            } catch (Throwable ex) {
              LOG.error("Data file, " + getFile().toString() + " could not " +
                  "be synced to disk due to an error.", ex);
            }
          }
        }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
      } else {
        syncExecutor = null;
      }
{code}


{code}
private void commit(long transactionID, short type) throws IOException {
    Preconditions.checkState(open, "Log is closed");
    Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
    int logFileIndex = nextLogWriter(transactionID);
    long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
    long requiredSpace = minimumRequiredSpace + buffer.limit();
    if (usableSpace <= requiredSpace) {
      throw new IOException("Usable space exhausted, only " + usableSpace +
          " bytes remaining, required " + requiredSpace + " bytes");
    }
    boolean error = true;
    try {
      try {
        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
        // If multiple transactions are committing at the same time,
        // this ensures that the number of actual fsyncs is small and a
        // number of them are grouped together into one.
        logFileWriter.commit(buffer);
        logFileWriter.sync();
        error = false;
      } catch (LogFileRetryableIOException e) {
        if (!open) {
          throw e;
        }
        roll(logFileIndex, buffer);
        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
        logFileWriter.commit(buffer);
        logFileWriter.sync();
        error = false;
      }
    } finally {
      if (error && open) {
        roll(logFileIndex);
      }
    }
  }
{code}



  was:
every take or put transaction has sync disk?
{code}
this.fsyncPerTransaction = fsyncPerTransaction;
      this.fsyncInterval = fsyncInterval;
      if (!fsyncPerTransaction) {
        LOG.info("Sync interval = " + fsyncInterval);
        syncExecutor = Executors.newSingleThreadScheduledExecutor();
        syncExecutor.scheduleWithFixedDelay(new Runnable() {
          @Override
          public void run() {
            try {
              sync("Thread-other");
            } catch (Throwable ex) {
              LOG.error("Data file, " + getFile().toString() + " could not " +
                  "be synced to disk due to an error.", ex);
            }
          }
        }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
      } else {
        syncExecutor = null;
      }
{code}
private void commit(long transactionID, short type) throws IOException {
    Preconditions.checkState(open, "Log is closed");
    Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
    int logFileIndex = nextLogWriter(transactionID);
    long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
    long requiredSpace = minimumRequiredSpace + buffer.limit();
    if (usableSpace <= requiredSpace) {
      throw new IOException("Usable space exhausted, only " + usableSpace +
          " bytes remaining, required " + requiredSpace + " bytes");
    }
    boolean error = true;
    try {
      try {
        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
        // If multiple transactions are committing at the same time,
        // this ensures that the number of actual fsyncs is small and a
        // number of them are grouped together into one.
        logFileWriter.commit(buffer);
        logFileWriter.sync();
        error = false;
      } catch (LogFileRetryableIOException e) {
        if (!open) {
          throw e;
        }
        roll(logFileIndex, buffer);
        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
        logFileWriter.commit(buffer);
        logFileWriter.sync();
        error = false;
      }
    } finally {
      if (error && open) {
        roll(logFileIndex);
      }
    }
  }





> the parameters of fsyncPerTransaction are not working 
> ------------------------------------------------------
>
>                 Key: FLUME-3266
>                 URL: https://issues.apache.org/jira/browse/FLUME-3266
>             Project: Flume
>          Issue Type: Bug
>          Components: File Channel
>    Affects Versions: 1.8.0
>            Reporter: 翟玉勇
>            Priority: Minor
>
> every take or put transaction has sync disk?
> {code}
> this.fsyncPerTransaction = fsyncPerTransaction;
>       this.fsyncInterval = fsyncInterval;
>       if (!fsyncPerTransaction) {
>         LOG.info("Sync interval = " + fsyncInterval);
>         syncExecutor = Executors.newSingleThreadScheduledExecutor();
>         syncExecutor.scheduleWithFixedDelay(new Runnable() {
>           @Override
>           public void run() {
>             try {
>               sync("Thread-other");
>             } catch (Throwable ex) {
>               LOG.error("Data file, " + getFile().toString() + " could not " +
>                   "be synced to disk due to an error.", ex);
>             }
>           }
>         }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
>       } else {
>         syncExecutor = null;
>       }
> {code}
> {code}
> private void commit(long transactionID, short type) throws IOException {
>     Preconditions.checkState(open, "Log is closed");
>     Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
>     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
>     int logFileIndex = nextLogWriter(transactionID);
>     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
>     long requiredSpace = minimumRequiredSpace + buffer.limit();
>     if (usableSpace <= requiredSpace) {
>       throw new IOException("Usable space exhausted, only " + usableSpace +
>           " bytes remaining, required " + requiredSpace + " bytes");
>     }
>     boolean error = true;
>     try {
>       try {
>         LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
>         // If multiple transactions are committing at the same time,
>         // this ensures that the number of actual fsyncs is small and a
>         // number of them are grouped together into one.
>         logFileWriter.commit(buffer);
>         logFileWriter.sync();
>         error = false;
>       } catch (LogFileRetryableIOException e) {
>         if (!open) {
>           throw e;
>         }
>         roll(logFileIndex, buffer);
>         LogFile.Writer logFileWriter = logFiles.get(logFileIndex);
>         logFileWriter.commit(buffer);
>         logFileWriter.sync();
>         error = false;
>       }
>     } finally {
>       if (error && open) {
>         roll(logFileIndex);
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org