You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/12/15 01:19:30 UTC
[incubator-druid] branch 0.17.0-incubating updated:
GenericIndexedWriter: Fix issue when writing large values to large columns.
(#9029) (#9038)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.17.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.17.0-incubating by this push:
new 23b9da7 GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) (#9038)
23b9da7 is described below
commit 23b9da7b3c2e3c9dbe7c7b5746d1ae666904149d
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sat Dec 14 17:19:12 2019 -0800
GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) (#9038)
---
.../druid/segment/data/GenericIndexedWriter.java | 63 +++++++++++++++-----
.../segment/data/GenericIndexedWriterTest.java | 68 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 13 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 976c88f..7578506 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.data;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -48,7 +49,7 @@ import java.nio.channels.WritableByteChannel;
*/
public class GenericIndexedWriter<T> implements Serializer
{
- private static int PAGE_SIZE = 4096;
+ private static final int PAGE_SIZE = 4096;
private static final MetaSerdeHelper<GenericIndexedWriter> SINGLE_FILE_META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((GenericIndexedWriter x) -> GenericIndexed.VERSION_ONE)
@@ -148,9 +149,16 @@ public class GenericIndexedWriter<T> implements Serializer
@Nullable
private LongList headerOutLong;
+ // Used by checkedCastNonnegativeLongToInt. Will always be Integer.MAX_VALUE in production.
+ private int intMaxForCasting = Integer.MAX_VALUE;
+
private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);
- public GenericIndexedWriter(SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ObjectStrategy<T> strategy)
+ public GenericIndexedWriter(
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ String filenameBase,
+ ObjectStrategy<T> strategy
+ )
{
this(segmentWriteOutMedium, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE);
}
@@ -210,13 +218,18 @@ public class GenericIndexedWriter<T> implements Serializer
objectsSorted = false;
}
+ @VisibleForTesting
+ void setIntMaxForCasting(final int intMaxForCasting)
+ {
+ this.intMaxForCasting = intMaxForCasting;
+ }
+
public void write(@Nullable T objectToWrite) throws IOException
{
if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) {
objectsSorted = false;
}
- ++numWritten;
// for compatibility with the format (see GenericIndexed javadoc for description of the format),
// this field is used to store nullness marker, but in a better format this info can take 1 bit.
valuesOut.writeInt(objectToWrite == null ? GenericIndexed.NULL_VALUE_SIZE_MARKER : 0);
@@ -224,17 +237,28 @@ public class GenericIndexedWriter<T> implements Serializer
strategy.writeTo(objectToWrite, valuesOut);
}
- if (!requireMultipleFiles) {
- headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
- } else {
- headerOutLong.add(valuesOut.size());
- }
-
+ // Before updating the header, check if we need to switch to multi-file mode.
if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) {
requireMultipleFiles = true;
initializeHeaderOutLong();
}
+ // Increment number of values written. Important to do this after the check above, since numWritten is
+ // accessed during "initializeHeaderOutLong" to determine the length of the header.
+ ++numWritten;
+
+ if (!requireMultipleFiles) {
+ headerOut.writeInt(checkedCastNonnegativeLongToInt(valuesOut.size()));
+
+ // Check _again_ if we need to switch to multi-file mode. (We might need to after updating the header.)
+ if (getSerializedSize() > fileSizeLimit) {
+ requireMultipleFiles = true;
+ initializeHeaderOutLong();
+ }
+ } else {
+ headerOutLong.add(valuesOut.size());
+ }
+
if (objectsSorted) {
prevObject = objectToWrite;
}
@@ -250,7 +274,7 @@ public class GenericIndexedWriter<T> implements Serializer
startOffset = getOffset(index - 1) + Integer.BYTES;
}
long endOffset = getOffset(index);
- int valueSize = Ints.checkedCast(endOffset - startOffset);
+ int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset);
if (valueSize == 0) {
return null;
}
@@ -391,7 +415,7 @@ public class GenericIndexedWriter<T> implements Serializer
/**
* Checks if candidate value splits can divide value file in such a way no object/element crosses the value splits.
*
- * @param powerTwo candidate value split expressed as power of 2.
+ * @param powerTwo candidate value split expressed as power of 2.
*
* @return true if candidate value split can hold all splits.
*
@@ -410,7 +434,7 @@ public class GenericIndexedWriter<T> implements Serializer
if (headerIndex >= numWritten) {
return true;
} else if (headerIndex + bagSize <= numWritten) {
- currentValueOffset = headerOutLong.getLong(Ints.checkedCast(headerIndex + bagSize - 1));
+ currentValueOffset = headerOutLong.getLong(checkedCastNonnegativeLongToInt(headerIndex + bagSize - 1));
} else if (numWritten < headerIndex + bagSize) {
currentValueOffset = headerOutLong.getLong(numWritten - 1);
}
@@ -446,7 +470,7 @@ public class GenericIndexedWriter<T> implements Serializer
}
currentNumBytes = headerOutLong.getLong(pos);
relativeNumBytes = currentNumBytes - relativeRefBytes;
- helperBuffer.putInt(0, Ints.checkedCast(relativeNumBytes));
+ helperBuffer.putInt(0, checkedCastNonnegativeLongToInt(relativeNumBytes));
helperBuffer.clear();
smooshChannel.write(helperBuffer);
}
@@ -464,4 +488,17 @@ public class GenericIndexedWriter<T> implements Serializer
}
}
+ /**
+ * Cast a long to an int, throwing an exception if it is out of range. Uses "intMaxForCasting" as the max
+ * integer value. Only works for nonnegative "n".
+ */
+ private int checkedCastNonnegativeLongToInt(final long n)
+ {
+ if (n >= 0 && n <= intMaxForCasting) {
+ return (int) n;
+ } else {
+ // Likely bug.
+ throw new IAE("Value out of nonnegative int range");
+ }
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java
new file mode 100644
index 0000000..7be04c3
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+public class GenericIndexedWriterTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void staticSetUp()
+ {
+ NullHandling.initializeForTests();
+ }
+
+ @Test
+ public void writeLargeValueIntoLargeColumn() throws IOException
+ {
+ // Regression test for https://github.com/apache/incubator-druid/issues/9027.
+
+ final GenericIndexedWriter<String> writer = new GenericIndexedWriter<>(
+ new OnHeapMemorySegmentWriteOutMedium(),
+ "test",
+ GenericIndexed.STRING_STRATEGY,
+ 100
+ );
+
+ writer.setIntMaxForCasting(150);
+ writer.open();
+ writer.write("i really like writing strings");
+ writer.write("i really like writing strings");
+ writer.write("i really like writing strings i really like writing strings i really like writing strings");
+ writer.write("i really like writing strings");
+ writer.writeTo(
+ FileChannel.open(temporaryFolder.newFile().toPath(), StandardOpenOption.WRITE),
+ new FileSmoosher(temporaryFolder.newFolder())
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org