You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/12/15 01:19:30 UTC

[incubator-druid] branch 0.17.0-incubating updated: GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) (#9038)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.17.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.17.0-incubating by this push:
     new 23b9da7  GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) (#9038)
23b9da7 is described below

commit 23b9da7b3c2e3c9dbe7c7b5746d1ae666904149d
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sat Dec 14 17:19:12 2019 -0800

    GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) (#9038)
---
 .../druid/segment/data/GenericIndexedWriter.java   | 63 +++++++++++++++-----
 .../segment/data/GenericIndexedWriterTest.java     | 68 ++++++++++++++++++++++
 2 files changed, 118 insertions(+), 13 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 976c88f..7578506 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.data;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -48,7 +49,7 @@ import java.nio.channels.WritableByteChannel;
  */
 public class GenericIndexedWriter<T> implements Serializer
 {
-  private static int PAGE_SIZE = 4096;
+  private static final int PAGE_SIZE = 4096;
 
   private static final MetaSerdeHelper<GenericIndexedWriter> SINGLE_FILE_META_SERDE_HELPER = MetaSerdeHelper
       .firstWriteByte((GenericIndexedWriter x) -> GenericIndexed.VERSION_ONE)
@@ -148,9 +149,16 @@ public class GenericIndexedWriter<T> implements Serializer
   @Nullable
   private LongList headerOutLong;
 
+  // Used by checkedCastNonnegativeLongToInt. Will always be Integer.MAX_VALUE in production.
+  private int intMaxForCasting = Integer.MAX_VALUE;
+
   private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);
 
-  public GenericIndexedWriter(SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ObjectStrategy<T> strategy)
+  public GenericIndexedWriter(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      String filenameBase,
+      ObjectStrategy<T> strategy
+  )
   {
     this(segmentWriteOutMedium, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE);
   }
@@ -210,13 +218,18 @@ public class GenericIndexedWriter<T> implements Serializer
     objectsSorted = false;
   }
 
+  @VisibleForTesting
+  void setIntMaxForCasting(final int intMaxForCasting)
+  {
+    this.intMaxForCasting = intMaxForCasting;
+  }
+
   public void write(@Nullable T objectToWrite) throws IOException
   {
     if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) {
       objectsSorted = false;
     }
 
-    ++numWritten;
     // for compatibility with the format (see GenericIndexed javadoc for description of the format),
     // this field is used to store nullness marker, but in a better format this info can take 1 bit.
     valuesOut.writeInt(objectToWrite == null ? GenericIndexed.NULL_VALUE_SIZE_MARKER : 0);
@@ -224,17 +237,28 @@ public class GenericIndexedWriter<T> implements Serializer
       strategy.writeTo(objectToWrite, valuesOut);
     }
 
-    if (!requireMultipleFiles) {
-      headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
-    } else {
-      headerOutLong.add(valuesOut.size());
-    }
-
+    // Before updating the header, check if we need to switch to multi-file mode.
     if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) {
       requireMultipleFiles = true;
       initializeHeaderOutLong();
     }
 
+    // Increment number of values written. Important to do this after the check above, since numWritten is
+    // accessed during "initializeHeaderOutLong" to determine the length of the header.
+    ++numWritten;
+
+    if (!requireMultipleFiles) {
+      headerOut.writeInt(checkedCastNonnegativeLongToInt(valuesOut.size()));
+
+      // Check _again_ if we need to switch to multi-file mode. (We might need to after updating the header.)
+      if (getSerializedSize() > fileSizeLimit) {
+        requireMultipleFiles = true;
+        initializeHeaderOutLong();
+      }
+    } else {
+      headerOutLong.add(valuesOut.size());
+    }
+
     if (objectsSorted) {
       prevObject = objectToWrite;
     }
@@ -250,7 +274,7 @@ public class GenericIndexedWriter<T> implements Serializer
       startOffset = getOffset(index - 1) + Integer.BYTES;
     }
     long endOffset = getOffset(index);
-    int valueSize = Ints.checkedCast(endOffset - startOffset);
+    int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset);
     if (valueSize == 0) {
       return null;
     }
@@ -391,7 +415,7 @@ public class GenericIndexedWriter<T> implements Serializer
   /**
    * Checks if candidate value splits can divide value file in such a way no object/element crosses the value splits.
    *
-   * @param powerTwo   candidate value split expressed as power of 2.
+   * @param powerTwo candidate value split expressed as power of 2.
    *
    * @return true if candidate value split can hold all splits.
    *
@@ -410,7 +434,7 @@ public class GenericIndexedWriter<T> implements Serializer
       if (headerIndex >= numWritten) {
         return true;
       } else if (headerIndex + bagSize <= numWritten) {
-        currentValueOffset = headerOutLong.getLong(Ints.checkedCast(headerIndex + bagSize - 1));
+        currentValueOffset = headerOutLong.getLong(checkedCastNonnegativeLongToInt(headerIndex + bagSize - 1));
       } else if (numWritten < headerIndex + bagSize) {
         currentValueOffset = headerOutLong.getLong(numWritten - 1);
       }
@@ -446,7 +470,7 @@ public class GenericIndexedWriter<T> implements Serializer
         }
         currentNumBytes = headerOutLong.getLong(pos);
         relativeNumBytes = currentNumBytes - relativeRefBytes;
-        helperBuffer.putInt(0, Ints.checkedCast(relativeNumBytes));
+        helperBuffer.putInt(0, checkedCastNonnegativeLongToInt(relativeNumBytes));
         helperBuffer.clear();
         smooshChannel.write(helperBuffer);
       }
@@ -464,4 +488,17 @@ public class GenericIndexedWriter<T> implements Serializer
     }
   }
 
+  /**
+   * Cast a long to an int, throwing an exception if it is out of range. Uses "intMaxForCasting" as the max
+   * integer value. Only works for nonnegative "n".
+   */
+  private int checkedCastNonnegativeLongToInt(final long n)
+  {
+    if (n >= 0 && n <= intMaxForCasting) {
+      return (int) n;
+    } else {
+      // Likely bug.
+      throw new IAE("Value out of nonnegative int range");
+    }
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java
new file mode 100644
index 0000000..7be04c3
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+public class GenericIndexedWriterTest
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void staticSetUp()
+  {
+    NullHandling.initializeForTests();
+  }
+
+  @Test
+  public void writeLargeValueIntoLargeColumn() throws IOException
+  {
+    // Regression test for https://github.com/apache/incubator-druid/issues/9027.
+
+    final GenericIndexedWriter<String> writer = new GenericIndexedWriter<>(
+        new OnHeapMemorySegmentWriteOutMedium(),
+        "test",
+        GenericIndexed.STRING_STRATEGY,
+        100
+    );
+
+    writer.setIntMaxForCasting(150);
+    writer.open();
+    writer.write("i really like writing strings");
+    writer.write("i really like writing strings");
+    writer.write("i really like writing strings i really like writing strings i really like writing strings");
+    writer.write("i really like writing strings");
+    writer.writeTo(
+        FileChannel.open(temporaryFolder.newFile().toPath(), StandardOpenOption.WRITE),
+        new FileSmoosher(temporaryFolder.newFolder())
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org