You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/24 22:25:20 UTC
[druid] branch 0.18.1 updated: Optimize FileWriteOutBytes to avoid
high system cpu usage (#9722) (#9770)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.1 by this push:
new 29073c5 Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770)
29073c5 is described below
commit 29073c56ec0758f6e43dd21e2f4b2e9976c8f3cc
Author: Suneet Saldanha <44...@users.noreply.github.com>
AuthorDate: Fri Apr 24 15:25:06 2020 -0700
Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770)
* optimize FileWriteOutBytes to avoid high sys cpu
* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException
* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size
* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size"
This reverts commit 965f7421
* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException"
This reverts commit 149e08c0
* optimize FileWriteOutBytes to avoid high sys cpu -- avoid IOEception never thrown check
* Fix size counting to handle IOE in FileWriteOutBytes + tests
* remove unused throws IOException in WriteOutBytes.size()
* Remove redundant throws IOExcpetion clauses
* Parameterize IndexMergeBenchmark
Co-authored-by: huanghui.bigrey <hu...@bytedance.com>
Co-authored-by: Suneet Saldanha <su...@imply.io>
Co-authored-by: BIGrey <hu...@163.com>
Co-authored-by: huanghui.bigrey <hu...@bytedance.com>
---
.../benchmark/indexing/IndexMergeBenchmark.java | 87 ++++++++++++--------
.../druid/segment/data/ByteBufferWriter.java | 2 +-
.../EntireLayoutColumnarDoublesSerializer.java | 2 +-
.../data/EntireLayoutColumnarFloatsSerializer.java | 2 +-
.../druid/segment/data/GenericIndexedWriter.java | 6 +-
.../segment/serde/ComplexColumnSerializer.java | 2 +-
...argeColumnSupportedComplexColumnSerializer.java | 2 +-
.../druid/segment/serde/MetaSerdeHelper.java | 6 +-
.../druid/segment/writeout/FileWriteOutBytes.java | 12 ++-
.../druid/segment/writeout/WriteOutBytes.java | 2 +-
.../segment/writeout/FileWriteOutBytesTest.java | 95 ++++++++++++++++++++--
11 files changed, 161 insertions(+), 57 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 5f330ee..6b3b902 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -35,11 +35,13 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -66,6 +68,7 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class IndexMergeBenchmark
{
+
@Param({"5"})
private int numSegments;
@@ -78,9 +81,13 @@ public class IndexMergeBenchmark
@Param({"true", "false"})
private boolean rollup;
+ @Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"})
+ private SegmentWriteOutType factoryType;
+
+
private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
- private static final IndexMergerV9 INDEX_MERGER_V9;
+
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
@@ -91,6 +98,7 @@ public class IndexMergeBenchmark
private List<QueryableIndex> indexesToMerge;
private BenchmarkSchemaInfo schemaInfo;
private File tmpDir;
+ private IndexMergerV9 indexMergerV9;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -99,23 +107,16 @@ public class IndexMergeBenchmark
JSON_MAPPER.setInjectableValues(injectableValues);
INDEX_IO = new IndexIO(
JSON_MAPPER,
- new ColumnConfig()
- {
- @Override
- public int columnCacheSizeBytes()
- {
- return 0;
- }
- }
+ () -> 0
);
- INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
@Setup
public void setup() throws IOException
{
- log.info("SETUP CALLED AT " + System.currentTimeMillis());
+ log.info("SETUP CALLED AT " + System.currentTimeMillis());
+ indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType));
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
indexesToMerge = new ArrayList<>();
@@ -143,7 +144,7 @@ public class IndexMergeBenchmark
tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());
- File indexFile = INDEX_MERGER_V9.persist(
+ File indexFile = indexMergerV9.persist(
incIndex,
tmpDir,
new IndexSpec(),
@@ -155,26 +156,6 @@ public class IndexMergeBenchmark
}
}
- @TearDown
- public void tearDown() throws IOException
- {
- FileUtils.deleteDirectory(tmpDir);
- }
-
- private IncrementalIndex makeIncIndex()
- {
- return new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(schemaInfo.getAggsArray())
- .withRollup(rollup)
- .build()
- )
- .setReportParseExceptions(false)
- .setMaxRowCount(rowsPerSegment)
- .buildOnheap();
- }
-
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@@ -186,7 +167,7 @@ public class IndexMergeBenchmark
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
- File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(
+ File mergedFile = indexMergerV9.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
@@ -199,8 +180,46 @@ public class IndexMergeBenchmark
}
finally {
tmpFile.delete();
+ }
+ }
+
+ @TearDown
+ public void tearDown() throws IOException
+ {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+ public enum SegmentWriteOutType
+ {
+ TMP_FILE,
+ OFF_HEAP,
+ ON_HEAP
+ }
+ private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type)
+ {
+ switch (type) {
+ case TMP_FILE:
+ return TmpFileSegmentWriteOutMediumFactory.instance();
+ case OFF_HEAP:
+ return OffHeapMemorySegmentWriteOutMediumFactory.instance();
+ case ON_HEAP:
+ return OnHeapMemorySegmentWriteOutMediumFactory.instance();
}
+ throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type);
+ }
+ private IncrementalIndex makeIncIndex()
+ {
+ return new IncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(schemaInfo.getAggsArray())
+ .withRollup(rollup)
+ .build()
+ )
+ .setReportParseExceptions(false)
+ .setMaxRowCount(rowsPerSegment)
+ .buildOnheap();
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
index ee1e781..bda8115 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
@@ -63,7 +63,7 @@ public class ByteBufferWriter<T> implements Serializer
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
return headerOut.size() + valueOut.size();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
index b330319..6265535 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
@@ -80,7 +80,7 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
index 75b7290..a933265 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
@@ -81,7 +81,7 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 95bc141..d4254ddd 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -296,7 +296,7 @@ public class GenericIndexedWriter<T> implements Serializer
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
if (requireMultipleFiles) {
// for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file.
@@ -394,7 +394,7 @@ public class GenericIndexedWriter<T> implements Serializer
*
* @throws IOException
*/
- private int bagSizePower() throws IOException
+ private int bagSizePower()
{
long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten;
@@ -421,7 +421,7 @@ public class GenericIndexedWriter<T> implements Serializer
*
* @throws IOException
*/
- private boolean actuallyFits(int powerTwo) throws IOException
+ private boolean actuallyFits(int powerTwo)
{
long lastValueOffset = 0;
long currentValueOffset = 0;
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
index 1441c78..c7a2d59 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
@@ -66,7 +66,7 @@ public class ComplexColumnSerializer<T> implements GenericColumnSerializer<T>
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
return writer.getSerializedSize();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
index bd68aef..c23c97c 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
@@ -106,7 +106,7 @@ public class LargeColumnSupportedComplexColumnSerializer<T> implements GenericCo
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
return writer.getSerializedSize();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
index ddda31f..113821c 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
@@ -117,7 +117,7 @@ public final class MetaSerdeHelper<T>
public interface FieldWriter<T>
{
- void writeTo(ByteBuffer buffer, T x) throws IOException;
+ void writeTo(ByteBuffer buffer, T x);
int size(T x);
}
@@ -125,10 +125,10 @@ public final class MetaSerdeHelper<T>
@FunctionalInterface
public interface IntFieldWriter<T> extends FieldWriter<T>
{
- int getField(T x) throws IOException;
+ int getField(T x);
@Override
- default void writeTo(ByteBuffer buffer, T x) throws IOException
+ default void writeTo(ByteBuffer buffer, T x)
{
buffer.putInt(getField(x));
}
diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
index 9ab579d..b12b15e 100644
--- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
+++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
@@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
private final File file;
private final FileChannel ch;
+ private long writeOutBytes;
/** Purposely big-endian, for {@link #writeInt(int)} implementation */
private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer
@@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
this.file = file;
this.ch = ch;
+ this.writeOutBytes = 0L;
}
private void flushIfNeeded(int bytesNeeded) throws IOException
@@ -66,6 +68,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
flushIfNeeded(1);
buffer.put((byte) b);
+ writeOutBytes++;
}
@Override
@@ -73,6 +76,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
flushIfNeeded(Integer.BYTES);
buffer.putInt(v);
+ writeOutBytes += Integer.BYTES;
}
@Override
@@ -85,6 +89,7 @@ final class FileWriteOutBytes extends WriteOutBytes
try {
src.limit(src.position() + buffer.capacity());
buffer.put(src);
+ writeOutBytes += buffer.capacity();
flush();
}
finally {
@@ -92,7 +97,9 @@ final class FileWriteOutBytes extends WriteOutBytes
src.limit(srcLimit);
}
}
+ int remaining = src.remaining();
buffer.put(src);
+ writeOutBytes += remaining;
return len;
}
@@ -103,10 +110,9 @@ final class FileWriteOutBytes extends WriteOutBytes
}
@Override
- public long size() throws IOException
+ public long size()
{
- flush();
- return ch.size();
+ return writeOutBytes;
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
index f6a5e71..e1c2972 100644
--- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
+++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
@@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte
/**
* Returns the number of bytes written to this WriteOutBytes so far.
*/
- public abstract long size() throws IOException;
+ public abstract long size();
/**
* Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel.
diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
index cfaa418..8501fa6 100644
--- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.writeout;
import org.easymock.EasyMock;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -36,28 +37,106 @@ public class FileWriteOutBytesTest
@Before
public void setUp()
{
- this.mockFileChannel = EasyMock.mock(FileChannel.class);
- this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
+ mockFileChannel = EasyMock.mock(FileChannel.class);
+ fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
}
@Test
- public void testWrite4KBInts() throws IOException
+ public void write4KBIntsShouldNotFlush() throws IOException
{
// Write 4KB of ints and expect the write operation of the file channel will be triggered only once.
- EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+ EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andAnswer(() -> {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
int remaining = buffer.remaining();
buffer.position(remaining);
return remaining;
}).times(1);
- EasyMock.replay(this.mockFileChannel);
+ EasyMock.replay(mockFileChannel);
final int writeBytes = 4096;
final int numOfInt = writeBytes / Integer.BYTES;
for (int i = 0; i < numOfInt; i++) {
- this.fileWriteOutBytes.writeInt(i);
+ fileWriteOutBytes.writeInt(i);
}
- this.fileWriteOutBytes.flush();
- EasyMock.verify(this.mockFileChannel);
+ // no need to flush up to 4KB
+ // the first byte after 4KB will cause a flush
+ fileWriteOutBytes.write(1);
+ EasyMock.verify(mockFileChannel);
+ }
+
+ @Test
+ public void writeShouldIncrementSize() throws IOException
+ {
+ fileWriteOutBytes.write(1);
+ Assert.assertEquals(1, fileWriteOutBytes.size());
+ }
+
+ @Test
+ public void writeIntShouldIncrementSize() throws IOException
+ {
+ fileWriteOutBytes.writeInt(1);
+ Assert.assertEquals(4, fileWriteOutBytes.size());
+ }
+
+ @Test
+ public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException
+ {
+ EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+ .andAnswer(() -> {
+ ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
+ int remaining = buffer.remaining();
+ buffer.position(remaining);
+ return remaining;
+ }).times(1);
+ EasyMock.replay(mockFileChannel);
+ ByteBuffer src = ByteBuffer.allocate(4096 + 1);
+ fileWriteOutBytes.write(src);
+ Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
+ EasyMock.verify(mockFileChannel);
+ }
+
+ @Test
+ public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException
+ {
+ EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+ .andAnswer(() -> {
+ ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
+ int remaining = buffer.remaining();
+ buffer.position(remaining);
+ return remaining;
+ }).once();
+ EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+ .andThrow(new IOException())
+ .once();
+ EasyMock.replay(mockFileChannel);
+ ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1);
+ try {
+ fileWriteOutBytes.write(src);
+ Assert.fail("IOException should have been thrown.");
+ }
+ catch (IOException e) {
+ // The second invocation to flush bytes fails. So the size should count what has already been put successfully
+ Assert.assertEquals(4096 * 2, fileWriteOutBytes.size());
+ }
+ }
+
+ @Test
+ public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException
+ {
+ ByteBuffer src = ByteBuffer.allocate(4096);
+ fileWriteOutBytes.write(src);
+ Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
+ }
+ @Test
+ public void sizeDoesNotFlush() throws IOException
+ {
+ EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+ .andThrow(new AssertionError("file channel should not have been written to."));
+ EasyMock.replay(mockFileChannel);
+ long size = fileWriteOutBytes.size();
+ Assert.assertEquals(0, size);
+ fileWriteOutBytes.writeInt(10);
+ size = fileWriteOutBytes.size();
+ Assert.assertEquals(4, size);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org