You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/05/11 00:47:05 UTC

[GitHub] [bookkeeper] dlg99 commented on a diff in pull request #3263: BP-47 (task6): Direct I/O entrylogger support

dlg99 commented on code in PR #3263:
URL: https://github.com/apache/bookkeeper/pull/3263#discussion_r869651292


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogIdsImpl.java:
##########
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.directentrylogger.Events;
+import org.apache.bookkeeper.slogger.Slogger;
+
+/**
+ * EntryLogIdsImpl.
+ */
+public class EntryLogIdsImpl implements EntryLogIds {
+    public static final Pattern FILE_PATTERN = Pattern.compile("^([0-9a-fA-F]+)\\.log$");
+    public static final Pattern COMPACTED_FILE_PATTERN =
+            Pattern.compile("^([0-9a-fA-F]+)\\.log\\.([0-9a-fA-F]+)\\.compacted$");
+
+    private final LedgerDirsManager ledgerDirsManager;
+    private final Slogger slog;
+    private int nextId;
+    private int maxId;
+
+    public EntryLogIdsImpl(LedgerDirsManager ledgerDirsManager,
+                           Slogger slog) throws IOException {
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.slog = slog;
+        findLargestGap();
+    }
+
+    @Override
+    public int nextId() throws IOException {
+        while (true) {
+            synchronized (this) {
+                int current = nextId;
+                nextId++;
+                if (nextId == maxId) {
+                    findLargestGap();
+                } else {
+                    return current;
+                }
+            }
+        }
+    }
+
+    private void findLargestGap() throws IOException {
+        long start = System.nanoTime();
+        List<Integer> currentIds = new ArrayList<Integer>();
+
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            currentIds.addAll(logIdsInDirectory(ledgerDir));
+            currentIds.addAll(compactedLogIdsInDirectory(ledgerDir));
+        }
+
+        int[] gap = findLargestGap(currentIds);
+        nextId = gap[0];
+        maxId = gap[1];
+        slog.kv("dirs", ledgerDirsManager.getAllLedgerDirs())
+            .kv("nextId", nextId)
+            .kv("maxId", maxId)
+            .kv("durationMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))
+            .info(Events.ENTRYLOG_IDS_CANDIDATES_SELECTED);
+    }
+
+    /**
+     * O(nlogn) algorithm to find largest contiguous gap between
+     * integers in a passed list. n should be relatively small.
+     * Entry logs should be about 1GB in size, so even if the node

Review Comment:
   not guaranteed in case of entry log per ledger.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java:
##########
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);

Review Comment:
   `00755` as in permissions? why 755 and not e.g. 600 or 640?
   "755 means read and execute access for everyone and also write access for the owner of the file." 
   Why are we granting execute access to this file? 
   Why read access is needed to everyone?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java:
##########
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();
+            }
+
+            int readable = buf.readableBytes();
+            long bufferPosition = position() + Integer.BYTES;
+            if (bufferPosition > Integer.MAX_VALUE) {
+                throw new IOException(exMsg("Cannot write past max int")
+                                      .kv("filename", filename)
+                                      .kv("writeSize", readable)
+                                      .kv("position", bufferPosition)
+                                      .toString());
+            }
+            nativeBuffer.writeInt(readable);
+            nativeBuffer.writeByteBuf(buf);
+            return (int) bufferPosition;
+        }
+    }
+
+    @Override
+    public void position(long offset) throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+            if ((offset % Buffer.ALIGNMENT) != 0) {
+                throw new IOException(exMsg("offset must be multiple of alignment")
+                                      .kv("offset", offset)
+                                      .kv("alignment", Buffer.ALIGNMENT)
+                                      .toString());
+            }
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long position() {
+        synchronized (bufferLock) {
+            return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+
+        waitForOutstandingWrites();
+
+        try {
+            int ret = nativeIO.fsync(fd);
+            checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+        }
+

Review Comment:
   I think you should do `waitForOutstandingWrites();` here or call `flush()` instead of `flushBuffer()` to confirm that the last outstanding write (submitted e.g. by `flushBuffer()`) actually succeeded. 



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogIdsImpl.java:
##########
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.directentrylogger.Events;
+import org.apache.bookkeeper.slogger.Slogger;
+
+/**
+ * EntryLogIdsImpl.
+ */
+public class EntryLogIdsImpl implements EntryLogIds {
+    public static final Pattern FILE_PATTERN = Pattern.compile("^([0-9a-fA-F]+)\\.log$");
+    public static final Pattern COMPACTED_FILE_PATTERN =
+            Pattern.compile("^([0-9a-fA-F]+)\\.log\\.([0-9a-fA-F]+)\\.compacted$");
+
+    private final LedgerDirsManager ledgerDirsManager;
+    private final Slogger slog;
+    private int nextId;
+    private int maxId;
+
+    public EntryLogIdsImpl(LedgerDirsManager ledgerDirsManager,
+                           Slogger slog) throws IOException {
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.slog = slog;
+        findLargestGap();
+    }
+
+    @Override
+    public int nextId() throws IOException {
+        while (true) {
+            synchronized (this) {
+                int current = nextId;
+                nextId++;
+                if (nextId == maxId) {
+                    findLargestGap();
+                } else {
+                    return current;
+                }
+            }
+        }
+    }
+
+    private void findLargestGap() throws IOException {
+        long start = System.nanoTime();
+        List<Integer> currentIds = new ArrayList<Integer>();
+
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            currentIds.addAll(logIdsInDirectory(ledgerDir));
+            currentIds.addAll(compactedLogIdsInDirectory(ledgerDir));
+        }
+
+        int[] gap = findLargestGap(currentIds);
+        nextId = gap[0];
+        maxId = gap[1];
+        slog.kv("dirs", ledgerDirsManager.getAllLedgerDirs())
+            .kv("nextId", nextId)
+            .kv("maxId", maxId)
+            .kv("durationMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))
+            .info(Events.ENTRYLOG_IDS_CANDIDATES_SELECTED);
+    }
+
+    /**
+     * O(nlogn) algorithm to find largest contiguous gap between
+     * integers in a passed list. n should be relatively small.
+     * Entry logs should be about 1GB in size, so even if the node
+     * stores a PB, there should be only 1000000 entry logs.
+     */
+    static int[] findLargestGap(List<Integer> currentIds) {

Review Comment:
   use `Pair<>` instead of int[] because that's what it is



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java:
##########
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();
+            }
+
+            int readable = buf.readableBytes();
+            long bufferPosition = position() + Integer.BYTES;
+            if (bufferPosition > Integer.MAX_VALUE) {
+                throw new IOException(exMsg("Cannot write past max int")
+                                      .kv("filename", filename)
+                                      .kv("writeSize", readable)
+                                      .kv("position", bufferPosition)
+                                      .toString());
+            }
+            nativeBuffer.writeInt(readable);
+            nativeBuffer.writeByteBuf(buf);
+            return (int) bufferPosition;
+        }
+    }
+
+    @Override
+    public void position(long offset) throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+            if ((offset % Buffer.ALIGNMENT) != 0) {
+                throw new IOException(exMsg("offset must be multiple of alignment")
+                                      .kv("offset", offset)
+                                      .kv("alignment", Buffer.ALIGNMENT)
+                                      .toString());
+            }
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long position() {
+        synchronized (bufferLock) {
+            return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+
+        waitForOutstandingWrites();
+
+        try {
+            int ret = nativeIO.fsync(fd);
+            checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+        }
+
+        try {
+            int ret = nativeIO.close(fd);
+            checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+        synchronized (bufferLock) {
+            bufferPool.release(nativeBuffer);
+            nativeBuffer = null;
+        }
+    }
+
+    private void addOutstandingWrite(Future<?> toAdd) throws IOException {
+        synchronized (outstandingWrites) {
+            outstandingWrites.add(toAdd);

Review Comment:
   There is no limit on number of outstandingWrites. 
   Typically we start with that and end up adding some kind of limiter or throttler to prevent system overload.
   Does it make sense to do it right now?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java:
##########
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();

Review Comment:
   can it be ConcurrentLinkedQueue to avoid synchronization later?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java:
##########
@@ -0,0 +1,322 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.common.util.nativeio.NativeIOException;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.commons.lang3.SystemUtils;
+
+class DirectWriter implements LogWriter {
+    final NativeIO nativeIO;
+    final int fd;
+    final int id;
+    final String filename;
+    final BufferPool bufferPool;
+    final ExecutorService writeExecutor;
+    final Object bufferLock = new Object();
+    final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>();
+    Buffer nativeBuffer;
+    long offset;
+    private volatile boolean useFallocate = true;
+
+    DirectWriter(int id,
+                 String filename,
+                 long maxFileSize,
+                 ExecutorService writeExecutor,
+                 BufferPool bufferPool,
+                 NativeIO nativeIO, Slogger slog) throws IOException {
+        checkArgument(maxFileSize > 0, "Max file size (%d) must be positive");
+        this.id = id;
+        this.filename = filename;
+        this.writeExecutor = writeExecutor;
+        this.nativeIO = nativeIO;
+
+        offset = 0;
+
+        try {
+            fd = nativeIO.open(filename,
+                               NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT,
+                               00755);
+            checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage()).kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString(), ne);
+        }
+
+        if (useFallocate) {
+            if (!SystemUtils.IS_OS_LINUX) {
+                useFallocate = false;
+                slog.warn(Events.FALLOCATE_NOT_AVAILABLE);
+            } else {
+                try {
+                    int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize);
+                    checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret);
+                } catch (NativeIOException ex) {
+                    // fallocate(2) is not supported on all filesystems.  Since this is an optimization, disable
+                    // subsequent usage instead of failing the operation.
+                    useFallocate = false;
+                    slog.kv("message", ex.getMessage())
+                        .kv("file", filename)
+                        .kv("errno", ex.getErrno())
+                        .warn(Events.FALLOCATE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        this.bufferPool = bufferPool;
+        this.nativeBuffer = bufferPool.acquire();
+    }
+
+    @Override
+    public int logId() {
+        return id;
+    }
+
+    @Override
+    public void writeAt(long offset, ByteBuf buf) throws IOException {
+        checkArgument(Buffer.isAligned(offset),
+                      "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset);
+        checkArgument(Buffer.isAligned(buf.readableBytes()),
+                      "Buffer must write multiple of alignment bytes (%d), %d is not",
+                      Buffer.ALIGNMENT, buf.readableBytes());
+        Buffer tmpBuffer = bufferPool.acquire();
+        int bytesToWrite = buf.readableBytes();
+        tmpBuffer.reset();
+        tmpBuffer.writeByteBuf(buf);
+        Future<?> f = writeExecutor.submit(() -> {
+                try {
+                    int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset);
+                    if (ret != bytesToWrite) {
+                        throw new IOException(exMsg("Incomplete write")
+                                              .kv("filename", filename)
+                                              .kv("writeSize", bytesToWrite)
+                                              .kv("bytesWritten", ret)
+                                              .kv("offset", offset).toString());
+                    }
+                } catch (NativeIOException ne) {
+                    throw new IOException(exMsg("Write error")
+                                          .kv("filename", filename)
+                                          .kv("writeSize", bytesToWrite)
+                                          .kv("errno", ne.getErrno())
+                                          .kv("offset", offset).toString());
+                } finally {
+                    bufferPool.release(tmpBuffer);
+                }
+                return null;
+            });
+        addOutstandingWrite(f);
+    }
+
+    @Override
+    public int writeDelimited(ByteBuf buf) throws IOException {
+        synchronized (bufferLock) {
+            if (!nativeBuffer.hasSpace(serializedSize(buf))) {
+                flushBuffer();
+            }
+
+            int readable = buf.readableBytes();
+            long bufferPosition = position() + Integer.BYTES;
+            if (bufferPosition > Integer.MAX_VALUE) {
+                throw new IOException(exMsg("Cannot write past max int")
+                                      .kv("filename", filename)
+                                      .kv("writeSize", readable)
+                                      .kv("position", bufferPosition)
+                                      .toString());
+            }
+            nativeBuffer.writeInt(readable);
+            nativeBuffer.writeByteBuf(buf);
+            return (int) bufferPosition;
+        }
+    }
+
+    @Override
+    public void position(long offset) throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+            if ((offset % Buffer.ALIGNMENT) != 0) {
+                throw new IOException(exMsg("offset must be multiple of alignment")
+                                      .kv("offset", offset)
+                                      .kv("alignment", Buffer.ALIGNMENT)
+                                      .toString());
+            }
+            this.offset = offset;
+        }
+    }
+
+    @Override
+    public long position() {
+        synchronized (bufferLock) {
+            return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+
+        waitForOutstandingWrites();
+
+        try {
+            int ret = nativeIO.fsync(fd);
+            checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (bufferLock) {
+            if (nativeBuffer != null && nativeBuffer.position() > 0) {
+                flushBuffer();
+            }
+        }
+
+        try {
+            int ret = nativeIO.close(fd);
+            checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret);
+        } catch (NativeIOException ne) {
+            throw new IOException(exMsg(ne.getMessage())
+                                  .kv("file", filename)
+                                  .kv("errno", ne.getErrno()).toString());
+        }
+        synchronized (bufferLock) {
+            bufferPool.release(nativeBuffer);
+            nativeBuffer = null;
+        }
+    }
+
+    private void addOutstandingWrite(Future<?> toAdd) throws IOException {
+        synchronized (outstandingWrites) {
+            outstandingWrites.add(toAdd);
+
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                if (f.isDone()) {
+                    waitForFuture(f);
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    private void waitForOutstandingWrites() throws IOException {
+        synchronized (outstandingWrites) {
+            Iterator<Future<?>> iter = outstandingWrites.iterator();
+            while (iter.hasNext()) { // clear out completed futures
+                Future<?> f = iter.next();
+                waitForFuture(f);
+                iter.remove();
+            }
+        }
+    }
+
+    private void waitForFuture(Future<?> f) throws IOException {
+        try {
+            f.get();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException(ie);
+        } catch (Throwable t) {
+            if (t.getCause() instanceof IOException) {
+                throw (IOException) t.getCause();
+            } else {
+                throw new IOException(t);
+            }
+        }
+    }
+
+    private void flushBuffer() throws IOException {
+        synchronized (bufferLock) {
+            if (this.nativeBuffer != null) {
+                int bytesToWrite = this.nativeBuffer.padToAlignment();
+                if (bytesToWrite == 0) {
+                    return;
+                }
+
+                Buffer bufferToFlush = this.nativeBuffer;
+                this.nativeBuffer = null;
+
+                long offsetToWrite = offset;
+                offset += bytesToWrite;
+
+                Future<?> f = writeExecutor.submit(() -> {
+                        try {
+                            if (bytesToWrite <= 0) {
+                                return null;
+                            }
+                            int ret = nativeIO.pwrite(fd, bufferToFlush.pointer(), bytesToWrite, offsetToWrite);
+                            if (ret != bytesToWrite) {
+                                throw new IOException(exMsg("Incomplete write")
+                                                      .kv("filename", filename)
+                                                      .kv("pointer", bufferToFlush.pointer())
+                                                      .kv("offset", offsetToWrite)
+                                                      .kv("writeSize", bytesToWrite)
+                                                      .kv("bytesWritten", ret).toString());
+                            }
+                        } catch (NativeIOException ne) {
+                            throw new IOException(exMsg(ne.getMessage())
+                                                  .kv("filename", filename)
+                                                  .kv("offset", offsetToWrite)
+                                                  .kv("writeSize", bytesToWrite)
+                                                  .kv("pointer", bufferToFlush.pointer())
+                                                  .kv("errno", ne.getErrno()).toString(), ne);
+                        } finally {
+                            bufferPool.release(bufferToFlush);
+                        }
+                        return null;

Review Comment:
   same as lines 120+? consider refactoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org