You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2021/12/08 19:19:02 UTC

[GitHub] [bookkeeper] mauricebarnum opened a new pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

mauricebarnum opened a new pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932


   Descriptions of the changes in this PR:
   
   
   
   ### Motivation
   
   BookKeeper's entry log writing is buffered in application code before being submitted to the operating system, which by will buffer it again.  The cost of this "double buffering" becomes a limiting write throughput factor.
   
   This set of changes adds optional support to bypass the operating system buffering on supported systems (currently Linux and MacOS) by using the open(2) flag O_DIRECT.  fallocate(2) is used, if available, to request that the filesystem allocate the required space before data is written.
   
   Access to the I/O system calls is via a JNI binding included in bookkeeper/native-io
   
   ### Changes
   
   - Refactor EntryLogger by introducing an interface, EntryLoggerIface.  The direct-io entry logger will implement this interface.
   - Add package bookkeeper-slogger to provide support for structured logging with a pluggable logging backend.  Provide an implementation using SLF4J.
   - Add native-io package to providing JNI bindings to operating system I/O api.
   - Refactor garbage collection and compaction to allow the entry logger to control which files are available to be garbage collected.
   - Add EventLoggerIface implementation DirectEntryLogger, enabled with `dbStorage_directIOEntryLogger=true`
   
   > ---
   > In order to uphold a high standard for quality for code contributions, Apache BookKeeper runs various precommit
   > checks for pull requests. A pull request can only be merged when it passes precommit checks.
   >
   > ---
   > Be sure to do all of the following to help us incorporate your contribution
   > quickly and easily:
   >
   > If this PR is a BookKeeper Proposal (BP):
   >
   > - [ ] Make sure the PR title is formatted like:
   >     `<BP-#>: Description of bookkeeper proposal`
   >     `e.g. BP-1: 64 bits ledger is support`
   > - [ ] Attach the master issue link in the description of this PR.
   > - [ ] Attach the google doc link if the BP is written in Google Doc.
   >
   > Otherwise:
   > 
   > - [ ] Make sure the PR title is formatted like:
   >     `<Issue #>: Description of pull request`
   >     `e.g. Issue 123: Description ...`
   > - [ ] Make sure tests pass via `mvn clean apache-rat:check install spotbugs:check`.
   > - [ ] Replace `<Issue #>` in the title with the actual Issue number.
   > 
   > ---
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-995042027


   I've waffled a bit on how far to collapse these commits.  All of the commits following "structured logger" logically fit together, but it gets unwieldy to follow some of the changes to common code that don't make sense as standalone commits themselves.  I thought about pushing the JNA->JNI changes up the history, but that introduces so many point changes I didn't want to risk introducing "transitory" bugs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1049100550


   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r780096618



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/WriterWithMetadata.java
##########
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.IOException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+
+/**
+ * WriterWithMetadata.
+ */
+class WriterWithMetadata {
+    private final LogWriter writer;
+    private final EntryLogMetadata metadata;
+    private final ByteBufAllocator allocator;
+
+    WriterWithMetadata(LogWriter writer, EntryLogMetadata metadata,
+                       ByteBufAllocator allocator) throws IOException {
+        this.writer = writer;
+        this.metadata = metadata;
+        this.allocator = allocator;
+
+        ByteBuf buf = allocator.buffer(Buffer.ALIGNMENT);
+        try {

Review comment:
       no. the naming/layering is confusing.
   
   1. newDirectWriter() -> construct DirectWriter, write empty header
   2. WritingDirectCompactionEntryLog constructor -> construct DirectWriter, pass to WriterWithMetadata constructor which then writes the empty header.
   
   the way the log file name is calculated is different in the two code paths, and there are other side effects.
   
   had the result of newDirectWriter() been used to construct WriterWithMetadata, the header would be written twice but at offset zero both times.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r826749675



##########
File path: bookkeeper-slogger/slf4j/src/main/java/org/apache/bookkeeper/slogger/slf4j/Slf4jSlogger.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.slogger.slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.bookkeeper.slogger.AbstractSlogger;
+import org.apache.bookkeeper.slogger.Slogger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Slf4j implementation of slogger.
+ */
+public class Slf4jSlogger extends AbstractSlogger {
+    private ThreadLocal<List<String>> mdcKeysTls = new ThreadLocal<List<String>>() {
+            @Override
+            protected List<String> initialValue() {
+                return new ArrayList<>();
+            }
+        };
+
+    private final Logger log;
+
+    public Slf4jSlogger(Class<?> clazz) {
+        this(clazz, Collections.emptyList());
+    }
+
+    Slf4jSlogger() {
+        this(Slf4jSlogger.class);
+    }
+
+    Slf4jSlogger(Class<?> clazz, Iterable<Object> parent) {
+        super(parent);
+        this.log = LoggerFactory.getLogger(clazz);
+    }
+
+    @Override
+    protected Slogger newSlogger(Optional<Class<?>> clazz, Iterable<Object> parent) {
+        return new Slf4jSlogger(clazz.orElse(Slf4jSlogger.class), parent);
+    }
+
+    @Override
+    protected void doLog(Level level, Enum<?> event, String message,
+                         Throwable throwable, List<Object> keyValues) {
+        List<String> mdcKeys = mdcKeysTls.get();
+        mdcKeys.clear();
+        try {
+            if (event != null) {
+                MDC.put("event", event.toString());
+                mdcKeys.add("event");
+            }
+
+            for (int i = 0; i < keyValues.size(); i += 2) {
+                MDC.put(keyValues.get(i).toString(), keyValues.get(i + 1).toString());
+                mdcKeys.add(keyValues.get(i).toString());
+            }
+
+            String msg = message == null ? event.toString() : message;
+            switch (level) {
+            case INFO:
+                log.info(msg);

Review comment:
       Do we need to add `%X` pattern for `ConversionPattern` in log4j.properties ? Otherwise, we won't print out the KVs in MDC




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1045880017


   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r825581614



##########
File path: bookkeeper-slogger/slf4j/build.gradle
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+plugins {
+    id 'java'
+}
+
+dependencies {
+    implementation project(':bookkeeper-slogger:api')
+    implementation depLibs.slf4j
+    testImplementation depLibs.slf4jSimple
+    testImplementation depLibs.junit
+}
+
+jar.archiveBaseName = 'bookkeeper-slogger-slf4j'

Review comment:
       publish jar name should be start as `org.apache.bookkeeper-`

##########
File path: bookkeeper-slogger/api/build.gradle
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+plugins {
+    id 'java'
+}
+
+dependencies {
+    testImplementation depLibs.hamcrest
+    testImplementation depLibs.junit
+}
+
+jar.archiveBaseName = 'bookkeeper-slogger-api'

Review comment:
       published jar name should be start with `org.apache.bookkeeper-`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r771386466



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
##########
@@ -77,6 +83,18 @@
     public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
 
     public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerReadBufferSizeMb";

Review comment:
       dbStorage_directIOEntryLoggerTotalReadBufferSizeMb ?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/WriterWithMetadata.java
##########
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.IOException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+
+/**
+ * WriterWithMetadata.
+ */
+class WriterWithMetadata {
+    private final LogWriter writer;
+    private final EntryLogMetadata metadata;
+    private final ByteBufAllocator allocator;
+
+    WriterWithMetadata(LogWriter writer, EntryLogMetadata metadata,
+                       ByteBufAllocator allocator) throws IOException {
+        this.writer = writer;
+        this.metadata = metadata;
+        this.allocator = allocator;
+
+        ByteBuf buf = allocator.buffer(Buffer.ALIGNMENT);
+        try {

Review comment:
       Does the writer write EmptyHeader twice? the emptyHeader has been written on `newDirectWriter` creation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r780096618



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/WriterWithMetadata.java
##########
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.IOException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+
+/**
+ * WriterWithMetadata.
+ */
+class WriterWithMetadata {
+    private final LogWriter writer;
+    private final EntryLogMetadata metadata;
+    private final ByteBufAllocator allocator;
+
+    WriterWithMetadata(LogWriter writer, EntryLogMetadata metadata,
+                       ByteBufAllocator allocator) throws IOException {
+        this.writer = writer;
+        this.metadata = metadata;
+        this.allocator = allocator;
+
+        ByteBuf buf = allocator.buffer(Buffer.ALIGNMENT);
+        try {

Review comment:
       no. the naming/layering is confusing.
   
   1. newDirectWriter() -> construct DirectWriter, write empty header
   2. WritingDirectCompactionEntryLog constructor -> construct DirectWriter, pass to WriterWithMetadata constructor which then writes the empty header.
   
   the way the log file name is calculated is different in the two code paths, and there are other side effects.
   
   had the result of newDirectWriter() been used to construct WriterWithMetadata, the header would be written twice but at offset zero both times.
   
   
   

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
##########
@@ -77,6 +83,18 @@
     public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
 
     public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerReadBufferSizeMb";

Review comment:
       good catch. i'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1012878965


   Looking forward this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r830276230



##########
File path: bookkeeper-slogger/slf4j/build.gradle
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+plugins {
+    id 'java'
+}
+
+dependencies {
+    implementation project(':bookkeeper-slogger:api')
+    implementation depLibs.slf4j
+    testImplementation depLibs.slf4jSimple
+    testImplementation depLibs.junit
+}
+
+jar.archiveBaseName = 'bookkeeper-slogger-slf4j'

Review comment:
       i only overrided the name to avoid getting tripped up by the log4shell owasp regex.  in general, we don't include `org.apache.bookkeeper-` in the jar file names, or even bother to rename rather generic ones like `stream/clients/java/kv/build/libs/kv.jar`
   
   ```
   ephemeral ~/src/bookkeeper-directio |> ls **/*.jar | wc -l
         40
   ephemeral ~/src/bookkeeper-directio |> ls **/*.jar | fgrep org.apache
   ephemeral ~/src/bookkeeper-directio |>
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r830247077



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
##########
@@ -0,0 +1,508 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.bookie.AbstractLogCompactor;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogIds;
+import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * DirectEntryLogger.
+ */
+public class DirectEntryLogger implements EntryLoggerIface {
+    private static final String LOGFILE_SUFFIX = ".log";
+    private final Slogger slog;
+    private final File ledgerDir;
+    private final EntryLogIds ids;
+    private final ExecutorService writeExecutor;
+    private final ExecutorService flushExecutor;
+    private final long maxFileSize;
+    private final DirectEntryLoggerStats stats;
+    private final ByteBufAllocator allocator;
+    private final BufferPool writeBuffers;
+    private final int readBufferSize;
+    private final int maxSaneEntrySize;
+    private final Set<Integer> unflushedLogs;
+
+    private WriterWithMetadata curWriter;
+
+    private List<Future<?>> pendingFlushes;
+    private final NativeIO nativeIO;
+    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>();
+    private final ThreadLocal<Cache<Integer, LogReader>> caches;
+
+    private static final int NUMBER_OF_WRITE_BUFFERS = 8;
+
+    public DirectEntryLogger(File ledgerDir,
+                             EntryLogIds ids,
+                             NativeIO nativeIO,
+                             ByteBufAllocator allocator,
+                             ExecutorService writeExecutor,
+                             ExecutorService flushExecutor,
+                             long maxFileSize,
+                             int maxSaneEntrySize,
+                             long totalWriteBufferSize,
+                             long totalReadBufferSize,
+                             int readBufferSize,
+                             int numReadThreads,
+                             int maxFdCacheTimeSeconds,
+                             Slogger slogParent,
+                             StatsLogger stats) throws IOException {
+        this.ledgerDir = ledgerDir;
+        this.flushExecutor = flushExecutor;
+        this.writeExecutor = writeExecutor;
+        this.pendingFlushes = new ArrayList<>();
+        this.nativeIO = nativeIO;
+        this.unflushedLogs = ConcurrentHashMap.newKeySet();
+
+        this.maxFileSize = maxFileSize;
+        this.maxSaneEntrySize = maxSaneEntrySize;
+        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
+        this.ids = ids;
+        this.slog = slogParent.kv("directory", ledgerDir).ctx();
+
+        this.stats = new DirectEntryLoggerStats(stats);
+
+        this.allocator = allocator;
+
+        int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
+        this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
+
+        // The total read buffer memory needs to get split across all the read threads, since the caches
+        // are thread-specific and we want to ensure we don't pass the total memory limit.
+        long perThreadBufferSize = totalReadBufferSize / numReadThreads;
+
+        // if the amount of total read buffer size is too low, and/or the number of read threads is too high
+        // then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
+        // from the cache
+        if (perThreadBufferSize < readBufferSize) {

Review comment:
       ```diff
   diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   index 41417df57..fab76d2af 100644
   --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   @@ -98,7 +98,7 @@ public class DirectEntryLogger implements EntryLoggerIface {
                                 int maxSaneEntrySize,
                                 long totalWriteBufferSize,
                                 long totalReadBufferSize,
   -                             int readBufferSize,
   +                             int unalignedReadBufferSize,
                                 int numReadThreads,
                                 int maxFdCacheTimeSeconds,
                                 Slogger slogParent,
   @@ -112,7 +112,7 @@ public class DirectEntryLogger implements EntryLoggerIface {
   
            this.maxFileSize = maxFileSize;
            this.maxSaneEntrySize = maxSaneEntrySize;
   -        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
   +        this.readBufferSize = Buffer.nextAlignment(unalignedReadBufferSize);
            this.ids = ids;
            this.slog = slogParent.kv("directory", ledgerDir).ctx();
   
   @@ -131,7 +131,7 @@ public class DirectEntryLogger implements EntryLoggerIface {
            // then the perThreadBufferSize can be lower than the readBufferSize causing immediate evicti
   on of readers
            // from the cache
            if (perThreadBufferSize < readBufferSize) {
   -            slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader
    cache eviction)")
   +            slog.kv("reason", "perThreadBufferSize lower than aligned readBufferSize (causes immediat
   e reader cache eviction)")
                    .kv("totalReadBufferSize", totalReadBufferSize)
                    .kv("totalNumReadThreads", numReadThreads)
                    .kv("readBufferSize", readBufferSize)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1068634837


   @mauricebarnum  Would you please send a proposal discuss into dev@bookkeeper.apache.org mail list ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1023904859


   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1066927092


   Would you please address the the CI?
   ```
   + dev/check-binary-license dev/../bookkeeper-dist/server/build/distributions/bookkeeper-server-4.15.0-SNAPSHOT-bin.tar.gz
   bookkeeper-slogger-api.jar unaccounted for in LICENSE
   bookkeeper-slogger-slf4j.jar unaccounted for in LICENSE
   org.slf4j-slf4j-api-1.7.36.jar unaccounted for in LICENSE
   org.slf4j-slf4j-api-1.7.32.jar mentioned in LICENSE, but not bundled
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r826557707



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java
##########
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private static volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();

Review comment:
       The flush operation will be triggered when the nativeBuffer has no remaining space. 
   
   However, if our disk is SSD, it will have better performance for smaller IO than bigger IO.
   
   Our default nativeBuffer size is `0.125 * direct memory / number of ledger directories / 8`, and could be configured by specific parameter.
   
   We'd better to make it to be configured by `flushEntrylogBytes`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1012705882


   > I've waffled a bit on how far to collapse these commits.
   
   except for the commit comments (i won't edit them unless this version is preferred): https://github.com/apache/bookkeeper/compare/master...mauricebarnum:directio-squashed?expand=1
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-990110895


   the new modules only build with gradle. i haven't added maven support yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r830276230



##########
File path: bookkeeper-slogger/slf4j/build.gradle
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+plugins {
+    id 'java'
+}
+
+dependencies {
+    implementation project(':bookkeeper-slogger:api')
+    implementation depLibs.slf4j
+    testImplementation depLibs.slf4jSimple
+    testImplementation depLibs.junit
+}
+
+jar.archiveBaseName = 'bookkeeper-slogger-slf4j'

Review comment:
       i only overrided the name to avoid getting tripped up by the log4shell owasp regex.  in general, we don't include `org.apache.bookkeeper-` in the jar file names, or even bother to rename rather generic ones like `stream/clients/java/kv/build/libs/kv.jar`
   
   ```
   ephemeral ~/src/bookkeeper-directio |> ls **/*.jar | wc -l
         40
   ephemeral ~/src/bookkeeper-directio |> ls **/*.jar | fgrep org.apache
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] eolivelli commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1072882245


   We should definitely split this patch into separate changes.
   I know it is hard but it is worth


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1024084199


   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-1066974392


   ping @eolivelli , Would you please help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] hangc0276 commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r826054869



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
##########
@@ -0,0 +1,508 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.bookie.AbstractLogCompactor;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogIds;
+import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * DirectEntryLogger.
+ */
+public class DirectEntryLogger implements EntryLoggerIface {
+    private static final String LOGFILE_SUFFIX = ".log";
+    private final Slogger slog;
+    private final File ledgerDir;
+    private final EntryLogIds ids;
+    private final ExecutorService writeExecutor;
+    private final ExecutorService flushExecutor;
+    private final long maxFileSize;
+    private final DirectEntryLoggerStats stats;
+    private final ByteBufAllocator allocator;
+    private final BufferPool writeBuffers;
+    private final int readBufferSize;
+    private final int maxSaneEntrySize;
+    private final Set<Integer> unflushedLogs;
+
+    private WriterWithMetadata curWriter;
+
+    private List<Future<?>> pendingFlushes;
+    private final NativeIO nativeIO;
+    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>();
+    private final ThreadLocal<Cache<Integer, LogReader>> caches;
+
+    private static final int NUMBER_OF_WRITE_BUFFERS = 8;
+
+    public DirectEntryLogger(File ledgerDir,
+                             EntryLogIds ids,
+                             NativeIO nativeIO,
+                             ByteBufAllocator allocator,
+                             ExecutorService writeExecutor,
+                             ExecutorService flushExecutor,
+                             long maxFileSize,
+                             int maxSaneEntrySize,
+                             long totalWriteBufferSize,
+                             long totalReadBufferSize,
+                             int readBufferSize,
+                             int numReadThreads,
+                             int maxFdCacheTimeSeconds,
+                             Slogger slogParent,
+                             StatsLogger stats) throws IOException {
+        this.ledgerDir = ledgerDir;
+        this.flushExecutor = flushExecutor;
+        this.writeExecutor = writeExecutor;
+        this.pendingFlushes = new ArrayList<>();
+        this.nativeIO = nativeIO;
+        this.unflushedLogs = ConcurrentHashMap.newKeySet();
+
+        this.maxFileSize = maxFileSize;
+        this.maxSaneEntrySize = maxSaneEntrySize;
+        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
+        this.ids = ids;
+        this.slog = slogParent.kv("directory", ledgerDir).ctx();
+
+        this.stats = new DirectEntryLoggerStats(stats);
+
+        this.allocator = allocator;
+
+        int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
+        this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
+
+        // The total read buffer memory needs to get split across all the read threads, since the caches
+        // are thread-specific and we want to ensure we don't pass the total memory limit.
+        long perThreadBufferSize = totalReadBufferSize / numReadThreads;
+
+        // if the amount of total read buffer size is too low, and/or the number of read threads is too high
+        // then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
+        // from the cache
+        if (perThreadBufferSize < readBufferSize) {
+            slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)")
+                .kv("totalReadBufferSize", totalReadBufferSize)
+                .kv("totalNumReadThreads", numReadThreads)
+                .kv("readBufferSize", readBufferSize)
+                .kv("perThreadBufferSize", perThreadBufferSize)
+                .error(Events.ENTRYLOGGER_MISCONFIGURED);
+        }
+
+        long maxCachedReadersPerThread = perThreadBufferSize / readBufferSize;
+        long maxCachedReaders = maxCachedReadersPerThread * numReadThreads;
+
+        this.slog
+            .kv("maxFileSize", maxFileSize)
+            .kv("maxSaneEntrySize", maxSaneEntrySize)
+            .kv("totalWriteBufferSize", totalWriteBufferSize)
+            .kv("singleWriteBufferSize", singleWriteBufferSize)
+            .kv("totalReadBufferSize", totalReadBufferSize)
+            .kv("readBufferSize", readBufferSize)
+            .kv("perThreadBufferSize", perThreadBufferSize)
+            .kv("maxCachedReadersPerThread", maxCachedReadersPerThread)
+            .kv("maxCachedReaders", maxCachedReaders)
+            .info(Events.ENTRYLOGGER_CREATED);
+
+        this.caches = ThreadLocal.withInitial(() -> {
+            RemovalListener<Integer, LogReader> rl = (notification) -> {
+                try {
+                    notification.getValue().close();
+                    this.stats.getCloseReaderCounter().inc();
+                } catch (IOException ioe) {
+                    slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR);
+                }
+            };
+            Cache<Integer, LogReader> cache = CacheBuilder.newBuilder()
+                    .maximumWeight(perThreadBufferSize)
+                    .weigher((key, value) -> readBufferSize)
+                    .removalListener(rl)
+                    .expireAfterAccess(maxFdCacheTimeSeconds, TimeUnit.SECONDS)
+                    .concurrencyLevel(1) // important to avoid too aggressive eviction
+                    .build();
+            allCaches.add(cache);
+            return cache;
+        });
+    }
+
+    @Override
+    public long addEntry(long ledgerId, ByteBuf buf) throws IOException {
+        long start = System.nanoTime();
+        int size = buf.readableBytes();

Review comment:
       The size doesn't ever used, remove it?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java
##########
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private static volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();
+            }
+
+            int readable = buf.readableBytes();
+            long bufferPosition = position() + Integer.BYTES;
+            if (bufferPosition > Integer.MAX_VALUE) {
+                throw new IOException(exMsg("Cannot write past max int")
+                                      .kv("filename", filename)
+                                      .kv("writeSize", readable)
+                                      .kv("position", bufferPosition)
+                                      .toString());
+            }
+            nativeBuffer.writeInt(readable);
+            nativeBuffer.writeByteBuf(buf);
+            return (int) bufferPosition;
+        }
+    }
+
+    @Override
+    public void position(long offset) throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+            if ((offset % Buffer.ALIGNMENT) != 0) {
+                throw new IOException(exMsg("offset must be multiple of alignment")
+                                      .kv("offset", offset)
+                                      .kv("alignment", Buffer.ALIGNMENT)
+                                      .toString());
+            }
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long position() {
+        synchronized (bufferLock) {
+            return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+
+        waitForOutstandingWrites();
+
+        try {
+            int ret = nativeIO.fsync(fd);
+            checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+        }
+
+        try {
+            int ret = nativeIO.close(fd);
+            checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+        synchronized (bufferLock) {
+            bufferPool.release(nativeBuffer);
+            nativeBuffer = null;
+        }
+    }
+
+    private void addOutstandingWrite(Future<?> toAdd) throws IOException {
+        synchronized (outstandingWrites) {
+            outstandingWrites.add(toAdd);
+
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                if (f.isDone()) {
+                    waitForFuture(f);
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    private void waitForOutstandingWrites() throws IOException {
+        synchronized (outstandingWrites) {
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                waitForFuture(f);
+                iter.remove();
+            }
+        }
+    }
+
+    private void waitForFuture(Future<?> f) throws IOException {
+        try {
+            f.get();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException(ie);
+        } catch (Throwable t) {
+            if (t.getCause() instanceof IOException) {
+                throw (IOException) t.getCause();
+            } else {
+                throw new IOException(t);
+            }
+        }
+    }
+
+    private void flushBuffer() throws IOException {
+        synchronized (bufferLock) {
+            Buffer bufferToFlush = nativeBuffer;
+            this.nativeBuffer = null;
+            if (bufferToFlush != null) {
+                int bytesToWrite = bufferToFlush.padToAlignment();
+                long offsetToWrite = offset;
+                offset += bytesToWrite;
+                Future<?> f = writeExecutor.submit(() -> {
+                        if (bufferToFlush == null) {

Review comment:
       Do we need this double check?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
##########
@@ -0,0 +1,508 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.bookie.AbstractLogCompactor;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogIds;
+import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * DirectEntryLogger.
+ */
+public class DirectEntryLogger implements EntryLoggerIface {
+    private static final String LOGFILE_SUFFIX = ".log";
+    private final Slogger slog;
+    private final File ledgerDir;
+    private final EntryLogIds ids;
+    private final ExecutorService writeExecutor;
+    private final ExecutorService flushExecutor;
+    private final long maxFileSize;
+    private final DirectEntryLoggerStats stats;
+    private final ByteBufAllocator allocator;
+    private final BufferPool writeBuffers;
+    private final int readBufferSize;
+    private final int maxSaneEntrySize;
+    private final Set<Integer> unflushedLogs;
+
+    private WriterWithMetadata curWriter;
+
+    private List<Future<?>> pendingFlushes;
+    private final NativeIO nativeIO;
+    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>();
+    private final ThreadLocal<Cache<Integer, LogReader>> caches;
+
+    private static final int NUMBER_OF_WRITE_BUFFERS = 8;
+
+    public DirectEntryLogger(File ledgerDir,
+                             EntryLogIds ids,
+                             NativeIO nativeIO,
+                             ByteBufAllocator allocator,
+                             ExecutorService writeExecutor,
+                             ExecutorService flushExecutor,
+                             long maxFileSize,
+                             int maxSaneEntrySize,
+                             long totalWriteBufferSize,
+                             long totalReadBufferSize,
+                             int readBufferSize,
+                             int numReadThreads,
+                             int maxFdCacheTimeSeconds,
+                             Slogger slogParent,
+                             StatsLogger stats) throws IOException {
+        this.ledgerDir = ledgerDir;
+        this.flushExecutor = flushExecutor;
+        this.writeExecutor = writeExecutor;
+        this.pendingFlushes = new ArrayList<>();
+        this.nativeIO = nativeIO;
+        this.unflushedLogs = ConcurrentHashMap.newKeySet();
+
+        this.maxFileSize = maxFileSize;
+        this.maxSaneEntrySize = maxSaneEntrySize;
+        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
+        this.ids = ids;
+        this.slog = slogParent.kv("directory", ledgerDir).ctx();
+
+        this.stats = new DirectEntryLoggerStats(stats);
+
+        this.allocator = allocator;
+
+        int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
+        this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
+
+        // The total read buffer memory needs to get split across all the read threads, since the caches
+        // are thread-specific and we want to ensure we don't pass the total memory limit.
+        long perThreadBufferSize = totalReadBufferSize / numReadThreads;
+
+        // if the amount of total read buffer size is too low, and/or the number of read threads is too high
+        // then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers
+        // from the cache
+        if (perThreadBufferSize < readBufferSize) {

Review comment:
       use aligned `this.readBufferSize` instead of original `readBufferSize`?

##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+
+/**
+ * BufferPool.
+ */
+public class BufferPool implements AutoCloseable {
+    private final int maxPoolSize;

Review comment:
       The field `maxPoolSize` not ever used, could we remove it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r830228309



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java
##########
@@ -0,0 +1,318 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private static volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();
+            }
+
+            int readable = buf.readableBytes();
+            long bufferPosition = position() + Integer.BYTES;
+            if (bufferPosition > Integer.MAX_VALUE) {
+                throw new IOException(exMsg("Cannot write past max int")
+                                      .kv("filename", filename)
+                                      .kv("writeSize", readable)
+                                      .kv("position", bufferPosition)
+                                      .toString());
+            }
+            nativeBuffer.writeInt(readable);
+            nativeBuffer.writeByteBuf(buf);
+            return (int) bufferPosition;
+        }
+    }
+
+    @Override
+    public void position(long offset) throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+            if ((offset % Buffer.ALIGNMENT) != 0) {
+                throw new IOException(exMsg("offset must be multiple of alignment")
+                                      .kv("offset", offset)
+                                      .kv("alignment", Buffer.ALIGNMENT)
+                                      .toString());
+            }
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long position() {
+        synchronized (bufferLock) {
+            return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+
+        waitForOutstandingWrites();
+
+        try {
+            int ret = nativeIO.fsync(fd);
+            checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+        }
+
+        try {
+            int ret = nativeIO.close(fd);
+            checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+        synchronized (bufferLock) {
+            bufferPool.release(nativeBuffer);
+            nativeBuffer = null;
+        }
+    }
+
+    private void addOutstandingWrite(Future<?> toAdd) throws IOException {
+        synchronized (outstandingWrites) {
+            outstandingWrites.add(toAdd);
+
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                if (f.isDone()) {
+                    waitForFuture(f);
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    private void waitForOutstandingWrites() throws IOException {
+        synchronized (outstandingWrites) {
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                waitForFuture(f);
+                iter.remove();
+            }
+        }
+    }
+
+    private void waitForFuture(Future<?> f) throws IOException {
+        try {
+            f.get();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException(ie);
+        } catch (Throwable t) {
+            if (t.getCause() instanceof IOException) {
+                throw (IOException) t.getCause();
+            } else {
+                throw new IOException(t);
+            }
+        }
+    }
+
+    private void flushBuffer() throws IOException {
+        synchronized (bufferLock) {
+            Buffer bufferToFlush = nativeBuffer;
+            this.nativeBuffer = null;
+            if (bufferToFlush != null) {
+                int bytesToWrite = bufferToFlush.padToAlignment();
+                long offsetToWrite = offset;
+                offset += bytesToWrite;
+                Future<?> f = writeExecutor.submit(() -> {
+                        if (bufferToFlush == null) {

Review comment:
       it doesn't look like it.  i'll move `bufferToFlush` inside of the conditional block as well.  what i'm not going to change yet but am thinking about: can the allocation of a new buffer be moved to where `nativeBuffer` is set to null now?  potentially we can exit this method with a null `nativeBuffer` if `addOutstandingWrite` raises an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] jvrao commented on pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
jvrao commented on pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#issuecomment-995264830


   Looking forward for this change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bookkeeper] mauricebarnum commented on a change in pull request #2932: direct-io: add support for bypassing operating system I/O cache when logging entries

Posted by GitBox <gi...@apache.org>.
mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r780096752



##########
File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
##########
@@ -77,6 +83,18 @@
     public static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
 
     public static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    public static final String DIRECT_IO_ENTRYLOGGER = "dbStorage_directIOEntryLogger";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerTotalWriteBufferSizeMb";
+    public static final String DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB =
+            "dbStorage_directIOEntryLoggerReadBufferSizeMb";

Review comment:
       good catch. i'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org