You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/01/06 07:30:26 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11888: add 'TypeStrategy' to types

jihoonson commented on a change in pull request #11888:
URL: https://github.com/apache/druid/pull/11888#discussion_r779314885



##########
File path: core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.common.config.NullHandling;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * TypeStrategy provides value comparison and binary serialization for Druid types. This can be obtained for ANY Druid
+ * type via {@link TypeSignature#getStrategy()}.
+ *
+ * IMPORTANT!!! DO NOT USE THIS FOR WRITING COLUMNS, THERE ARE VERY LIKELY FAR BETTER WAYS TO DO THIS. However, if you
+ * need to store a single value or small number of values, continue reading.
+ *
+ * ALSO IMPORTANT!!! This is primarily intended for writing ephemeral values within a single process, and is not
+ * especially well suited (by itself) for persistent storage of data or cross process transfer. The support typically
+ * necessary for such more persistent storage, such as tracking version of a format or endianness of the values, should
+ * be handled externally to support these use cases.
+ *
+ * All implementations of this mechanism support reading and writing ONLY non-null values. To handle nulls inline with
+ * your values, consider {@link NullableTypeStrategy}, which might be acceptable to use if you need to read and write
+ * nullable values, AND, you have enough memory to burn a full byte for every value you want to store. It will store
+ * values with a leading byte containing either {@link NullHandling#IS_NULL_BYTE} or
+ * {@link NullHandling#IS_NOT_NULL_BYTE} as appropriate. If you have a lot of values to write and a lot of nulls,
+ * consider alternative approaches to tracking your nulls instead.
+ *
+ * This mechanism allows using the natural {@link ByteBuffer#position()} and modify the underlying position as they
+ * operate, and also random access reads are specific offets, which do not modify the underlying position. If a method
+ * accepts an offset parameter, it does not modify the position, if not, it does.
+ *
+ * The only methods implementors are required to provide are {@link #read(ByteBuffer)},
+ * {@link #write(ByteBuffer, Object, int)} and {@link #estimateSizeBytes(Object)}, default implementations are provided
+ * to set and reset buffer positions as appropriate for the offset based methods, but may be overridden if a more
+ * optimized implementation is needed.
+ *
+ * Implementations of this interface should be thread safe, but may not use {@link ByteBuffer} in a thread safe manner,
+ * potentially modifying positions and limits, either temporarily or permanently depending on which set of methods is
+ * called.
+ */
+public interface TypeStrategy<T> extends Comparator<T>
+{
+  /**
+   * Estimate the size in bytes that writing this value to memory would require. This method is not required to be
+   * exactly correct, but many implementations might be. Implementations should err on the side of over-estimating if
+   * exact sizing is not efficient.
+   *
+   * Example usage of this method is estimating heap memory usage for an aggregator or the amount of buffer which
+   * might need allocated to then {@link #write} a value
+   */
+  int estimateSizeBytes(T value);
+
+
+  /**
+   * Read a non-null value from the {@link ByteBuffer} at the current {@link ByteBuffer#position()}. This will move
+   * the underlying position by the size of the value read.
+   *
+   * The contract of this method is that any value returned from this method MUST be completely detached from the
+   * underlying {@link ByteBuffer}, since it might outlive the memory location being allocated to hold the object.
+   * In other words, if an object is memory mapped, it must be copied on heap, or relocated to another memory location
+   * that is owned by the caller with {@link #write}.
+   */
+  T read(ByteBuffer buffer);
+
+  /**
+   * Write a non-null value to the {@link ByteBuffer} at position {@link ByteBuffer#position()}. This will move the
+   * underlying position by the size of the value written.
+   *
+   * This method returns the number of bytes written. If writing the value would take more than 'maxSizeBytes', this
+   * method will return a negative value indicating the number of additional bytes that would be required to fully
+   * write the value. Partial results may be written to the buffer when in this state, and the position may be left
+   * at whatever point the implementation ran out of space while writing the value. Callers should save the starting
+   * position if it is necessary to 'rewind' after a partial write.
+   *
+   * Callers MUST check that the return value is positive which indicates a successful write, while a negative response
+   * a partial write.
+   *
+   * @return number of bytes written
+   */
+  int write(ByteBuffer buffer, T value, int maxSizeBytes);

Review comment:
       Should we document in the javadoc that this method can explode when `maxSizeBytes > buffer.remaining()`?

##########
File path: processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.segment.data.ObjectStrategy;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Default implementation of {@link TypeStrategy} for all {@link org.apache.druid.segment.serde.ComplexMetricSerde}
+ * implementations that just wraps the {@link ObjectStrategy} they are required to implement.
+ *
+ * This is not likely to be the most efficient way to do things, especially since writing must first produce a byte
+ * array before it can be written to the buffer, but it is cheap and should work correctly, which is important.
+ */
+public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
+{
+  private final ObjectStrategy<T> objectStrategy;
+
+  public ObjectStrategyComplexTypeStrategy(ObjectStrategy<T> objectStrategy)
+  {
+    this.objectStrategy = objectStrategy;
+  }
+
+  @Override
+  public int estimateSizeBytes(@Nullable T value)
+  {
+    byte[] bytes = objectStrategy.toBytes(value);
+    return bytes == null ? 0 : bytes.length;
+  }
+
+  @Override
+  public T read(ByteBuffer buffer)
+  {
+    final int complexLength = buffer.getInt();
+    ByteBuffer dupe = buffer.duplicate();
+    dupe.limit(dupe.position() + complexLength);
+    return objectStrategy.fromByteBuffer(dupe, complexLength);
+  }
+
+  @Override
+  public int write(ByteBuffer buffer, T value, int maxSizeBytes)
+  {
+    byte[] bytes = objectStrategy.toBytes(value);

Review comment:
       Should this method also call `TypeStrategies.checkMaxSize()` first?

##########
File path: core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Floats;
+import com.google.common.primitives.Longs;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TypeStrategies
+{
+  public static final int VALUE_OFFSET = Byte.BYTES;
+  public static final int NULLABLE_LONG_SIZE = Byte.BYTES + Long.BYTES;
+  public static final int NULLABLE_DOUBLE_SIZE = Byte.BYTES + Double.BYTES;
+  public static final int NULLABLE_FLOAT_SIZE = Byte.BYTES + Float.BYTES;
+
+  public static final LongTypeStrategy LONG = new LongTypeStrategy();
+  public static final FloatTypeStrategy FLOAT = new FloatTypeStrategy();
+  public static final DoubleTypeStrategy DOUBLE = new DoubleTypeStrategy();
+  public static final StringTypeStrategy STRING = new StringTypeStrategy();
+  public static final ConcurrentHashMap<String, TypeStrategy<?>> COMPLEX_STRATEGIES = new ConcurrentHashMap<>();
+
+  /**
+   * Get an {@link TypeStrategy} registered to some {@link TypeSignature#getComplexTypeName()}.
+   */
+  @Nullable
+  public static TypeStrategy<?> getComplex(String typeName)
+  {
+    return COMPLEX_STRATEGIES.get(typeName);
+  }
+
+  /**
+   * hmm... this might look familiar... (see ComplexMetrics)
+   *
+   * Register a complex type name -> {@link TypeStrategy} mapping.
+   *
+   * If the specified type name is already used and the supplied {@link TypeStrategy} is not of the
+   * same type as the existing value in the map for said key, an {@link ISE} is thrown.
+   *
+   * @param strategy The {@link TypeStrategy} object to be associated with the 'type' in the map.
+   */
+  public static void registerComplex(String typeName, TypeStrategy<?> strategy)
+  {
+    Preconditions.checkNotNull(typeName);
+    COMPLEX_STRATEGIES.compute(typeName, (key, value) -> {
+      if (value == null) {
+        return strategy;
+      } else {
+        if (!value.getClass().getName().equals(strategy.getClass().getName())) {
+          throw new ISE(
+              "Incompatible strategy for type[%s] already exists. Expected [%s], found [%s].",
+              key,
+              strategy.getClass().getName(),
+              value.getClass().getName()
+          );
+        } else {
+          return value;
+        }
+      }
+    });
+  }
+
+  /**
+   * Clear and set the 'null' byte of a nullable value to {@link NullHandling#IS_NULL_BYTE} to a {@link ByteBuffer} at
+   * the supplied position. This method does not change the buffer position, limit, or mark, because it does not expect
+   * to own the buffer given to it (i.e. buffer aggs)
+   *
+   * Nullable types are stored with a leading byte to indicate if the value is null, followed by the value bytes
+   * (if not null)
+   *
+   * layout: | null (byte) | value |
+   *
+   * @return number of bytes written (always 1)
+   */
+  public static int writeNull(ByteBuffer buffer, int offset)
+  {
+    buffer.put(offset, NullHandling.IS_NULL_BYTE);
+    return 1;
+  }
+
+  /**
+   * Checks if a 'nullable' value's null byte is set to {@link NullHandling#IS_NULL_BYTE}. This method will mask the
+   * value of the null byte to only check if the null bit is set, meaning that the higher bits can be utilized for
+   * flags as necessary (e.g. using high bits to indicate if the value has been set or not for aggregators).
+   *
+   * Note that writing nullable values with the methods of {@link Types} will always clear and set the null byte to
+   * either {@link NullHandling#IS_NULL_BYTE} or {@link NullHandling#IS_NOT_NULL_BYTE}, losing any flag bits.
+   *
+   * layout: | null (byte) | value |
+   */
+  public static boolean isNullableNull(ByteBuffer buffer, int offset)
+  {
+    // use & so that callers can use the high bits of the null byte to pack additional information if necessary
+    return (buffer.get(offset) & NullHandling.IS_NULL_BYTE) == NullHandling.IS_NULL_BYTE;
+  }
+
+  /**
+   * Write a non-null long value to a {@link ByteBuffer} at the supplied offset. The first byte is always cleared and
+   * set to {@link NullHandling#IS_NOT_NULL_BYTE}, the long value is written in the next 8 bytes.
+   *
+   * layout: | null (byte) | long |
+   *
+   * This method does not change the buffer position, limit, or mark, because it does not expect to own the buffer
+   * given to it (i.e. buffer aggs)
+   *
+   * @return number of bytes written (always 9)
+   */
+  public static int writeNullableLong(ByteBuffer buffer, int offset, long value)

Review comment:
       nit: should it be called `writeLongWithNonNullMark` or something rather than `NullableLong`? It reads like the long parameter can be null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org