You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/11/09 15:18:31 UTC

[druid] branch master updated: RowBasedCursor: Add column-value-reuse optimization. (#11884)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 945a341  RowBasedCursor: Add column-value-reuse optimization. (#11884)
945a341 is described below

commit 945a341acdf423496483ca611874e46e4f6ecbf5
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Tue Nov 9 07:18:09 2021 -0800

    RowBasedCursor: Add column-value-reuse optimization. (#11884)
    
    * RowBasedCursor: Add column-value-reuse optimization.
    
    Most of the logic is in RowBasedColumnSelectorFactory, although in this
    patch its only user is RowBasedCursor. This improves performance of
    features that use RowBasedSegment, like lookup and inline datasources.
    It's especially helpful for inline datasources that contain lengthy
    arrays, due to the fact that the transformed array can be reused.
    
    * Changes from code review.
    
    * Fixes for ColumnCapabilitiesImplTest.
---
 .../segment/RowBasedColumnSelectorFactory.java     | 325 ++++++++++++---------
 .../org/apache/druid/segment/RowBasedCursor.java   |  10 +-
 .../segment/column/ColumnCapabilitiesImpl.java     |  10 +-
 3 files changed, 200 insertions(+), 145 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
index 6c5b3ef..0d8702a 100644
--- a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
@@ -31,14 +31,16 @@ import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.data.RangeIndexedInts;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
 
@@ -47,76 +49,96 @@ import java.util.function.ToLongFunction;
  */
 public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
 {
-  private final Supplier<T> supplier;
+  private static final long NO_ID = -1;
+
+  private final Supplier<T> rowSupplier;
+
+  @Nullable
+  private final LongSupplier rowIdSupplier;
   private final RowAdapter<T> adapter;
-  private final Supplier<RowSignature> rowSignatureSupplier;
+  private final Supplier<ColumnInspector> columnInspectorSupplier;
   private final boolean throwParseExceptions;
 
-  private RowBasedColumnSelectorFactory(
-      final Supplier<T> supplier,
+  /**
+   * Package-private constructor for {@link RowBasedCursor}. Allows passing in a rowIdSupplier, which enables
+   * column value reuse optimizations.
+   */
+  RowBasedColumnSelectorFactory(
+      final Supplier<T> rowSupplier,
+      @Nullable final LongSupplier rowIdSupplier,
       final RowAdapter<T> adapter,
-      final Supplier<RowSignature> rowSignatureSupplier,
+      final Supplier<ColumnInspector> columnInspectorSupplier,
       final boolean throwParseExceptions
   )
   {
-    this.supplier = supplier;
+    this.rowSupplier = rowSupplier;
+    this.rowIdSupplier = rowIdSupplier;
     this.adapter = adapter;
-    this.rowSignatureSupplier = Preconditions.checkNotNull(rowSignatureSupplier, "rowSignature must be nonnull");
+    this.columnInspectorSupplier =
+        Preconditions.checkNotNull(columnInspectorSupplier, "columnInspectorSupplier must be nonnull");
     this.throwParseExceptions = throwParseExceptions;
   }
 
   /**
    * Create an instance based on any object, along with a {@link RowAdapter} for that object.
    *
-   * @param adapter              adapter for these row objects
-   * @param supplier             supplier of row objects
-   * @param signatureSupplier    will be used for reporting available columns and their capabilities. Note that the this
-   *                             factory will still allow creation of selectors on any named field in the rows, even if
-   *                             it doesn't appear in "rowSignature". (It only needs to be accessible via
-   *                             {@link RowAdapter#columnFunction}.) As a result, you can achieve an untyped mode by
-   *                             passing in {@link RowSignature#empty()}.
-   * @param throwParseExceptions whether numeric selectors should throw parse exceptions or use a default/null value
-   *                             when their inputs are not actually numeric
+   * @param adapter                 adapter for these row objects
+   * @param supplier                supplier of row objects
+   * @param columnInspectorSupplier will be used for reporting available columns and their capabilities. Note that this
+   *                                factory will still allow creation of selectors on any named field in the rows, even if
+   *                                it doesn't appear in "columnInspector". (It only needs to be accessible via
+   *                                {@link RowAdapter#columnFunction}.) As a result, you can achieve an untyped mode by
+   *                                passing in {@link org.apache.druid.segment.column.RowSignature#empty()}.
+   * @param throwParseExceptions    whether numeric selectors should throw parse exceptions or use a default/null value
+   *                                when their inputs are not actually numeric
    */
   public static <RowType> RowBasedColumnSelectorFactory<RowType> create(
       final RowAdapter<RowType> adapter,
       final Supplier<RowType> supplier,
-      final Supplier<RowSignature> signatureSupplier,
+      final Supplier<ColumnInspector> columnInspectorSupplier,
       final boolean throwParseExceptions
   )
   {
-    return new RowBasedColumnSelectorFactory<>(supplier, adapter, signatureSupplier, throwParseExceptions);
+    return new RowBasedColumnSelectorFactory<>(supplier, null, adapter, columnInspectorSupplier, throwParseExceptions);
   }
 
   @Nullable
   static ColumnCapabilities getColumnCapabilities(
-      final RowSignature rowSignature,
+      final ColumnInspector columnInspector,
       final String columnName
   )
   {
     if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
-      // TIME_COLUMN_NAME is handled specially; override the provided rowSignature.
+      // TIME_COLUMN_NAME is handled specially; override the provided inspector.
       return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
     } else {
-      final ColumnType valueType = rowSignature.getColumnType(columnName).orElse(null);
-
+      final ColumnCapabilities inspectedCapabilities = columnInspector.getColumnCapabilities(columnName);
 
-      if (valueType != null) {
-        if (valueType.isNumeric()) {
-          return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(valueType);
+      if (inspectedCapabilities != null) {
+        if (inspectedCapabilities.isNumeric()) {
+          return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(inspectedCapabilities);
         }
 
-        if (valueType.isArray()) {
-          return ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(valueType);
+        if (inspectedCapabilities.isArray()) {
+          return ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(inspectedCapabilities);
         }
 
         // Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things.
-        // Do not set hasMultipleValues, because even though we might return multiple values, setting it affirmatively
-        // causes expression selectors to always treat us as arrays, so leave as unknown
-        return new ColumnCapabilitiesImpl()
-            .setType(valueType)
+        final ColumnCapabilitiesImpl retVal = new ColumnCapabilitiesImpl()
+            .setType(inspectedCapabilities)
             .setDictionaryValuesUnique(false)
             .setDictionaryValuesSorted(false);
+
+        // Set hasMultipleValues = false if the inspector asserts that there will not be multiple values.
+        //
+        // Note: we do not set hasMultipleValues = true ever, because even though we might return multiple values,
+        // setting it affirmatively causes expression selectors to always treat the column values as arrays. And we
+        // don't want that.
+        if (inspectedCapabilities.hasMultipleValues().isFalse()) {
+          retVal.setHasMultipleValues(false);
+        }
+
+        return retVal;
       } else {
         return null;
       }
@@ -145,148 +167,115 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
 
       return new BaseSingleValueDimensionSelector()
       {
+        private long currentId = NO_ID;
+        private String currentValue;
+
         @Override
         protected String getValue()
         {
-          return extractionFn.apply(timestampFunction.applyAsLong(supplier.get()));
+          updateCurrentValue();
+          return currentValue;
         }
 
         @Override
         public void inspectRuntimeShape(RuntimeShapeInspector inspector)
         {
-          inspector.visit("row", supplier);
+          inspector.visit("row", rowSupplier);
           inspector.visit("extractionFn", extractionFn);
         }
+
+        private void updateCurrentValue()
+        {
+          if (rowIdSupplier == null || rowIdSupplier.getAsLong() != currentId) {
+            currentValue = extractionFn.apply(timestampFunction.applyAsLong(rowSupplier.get()));
+
+            if (rowIdSupplier != null) {
+              currentId = rowIdSupplier.getAsLong();
+            }
+          }
+        }
       };
     } else {
       final Function<T, Object> dimFunction = adapter.columnFunction(dimension);
 
       return new DimensionSelector()
       {
+        private long currentId = NO_ID;
+        private List<String> dimensionValues;
+
         private final RangeIndexedInts indexedInts = new RangeIndexedInts();
 
         @Override
         public IndexedInts getRow()
         {
-          final List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-          indexedInts.setSize(dimensionValues != null ? dimensionValues.size() : 0);
+          updateCurrentValues();
+          indexedInts.setSize(dimensionValues.size());
           return indexedInts;
         }
 
         @Override
         public ValueMatcher makeValueMatcher(final @Nullable String value)
         {
-          if (extractionFn == null) {
-            return new ValueMatcher()
+          return new ValueMatcher()
+          {
+            @Override
+            public boolean matches()
             {
-              @Override
-              public boolean matches()
-              {
-                final List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-                if (dimensionValues == null || dimensionValues.isEmpty()) {
-                  return value == null;
-                }
+              updateCurrentValues();
 
-                for (String dimensionValue : dimensionValues) {
-                  if (Objects.equals(NullHandling.emptyToNullIfNeeded(dimensionValue), value)) {
-                    return true;
-                  }
-                }
-                return false;
+              if (dimensionValues.isEmpty()) {
+                return value == null;
               }
 
-              @Override
-              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-              {
-                inspector.visit("row", supplier);
-              }
-            };
-          } else {
-            return new ValueMatcher()
-            {
-              @Override
-              public boolean matches()
-              {
-                final List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-                if (dimensionValues == null || dimensionValues.isEmpty()) {
-                  return value == null;
-                }
-
-                for (String dimensionValue : dimensionValues) {
-                  if (Objects.equals(extractionFn.apply(NullHandling.emptyToNullIfNeeded(dimensionValue)), value)) {
-                    return true;
-                  }
+              for (String dimensionValue : dimensionValues) {
+                if (Objects.equals(NullHandling.emptyToNullIfNeeded(dimensionValue), value)) {
+                  return true;
                 }
-                return false;
               }
+              return false;
+            }
 
-              @Override
-              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-              {
-                inspector.visit("row", supplier);
-                inspector.visit("extractionFn", extractionFn);
-              }
-            };
-          }
+            @Override
+            public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+            {
+              inspector.visit("row", rowSupplier);
+              inspector.visit("extractionFn", extractionFn);
+            }
+          };
         }
 
         @Override
         public ValueMatcher makeValueMatcher(final Predicate<String> predicate)
         {
           final boolean matchNull = predicate.apply(null);
-          if (extractionFn == null) {
-            return new ValueMatcher()
-            {
-              @Override
-              public boolean matches()
-              {
-                final List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-                if (dimensionValues == null || dimensionValues.isEmpty()) {
-                  return matchNull;
-                }
 
-                for (String dimensionValue : dimensionValues) {
-                  if (predicate.apply(NullHandling.emptyToNullIfNeeded(dimensionValue))) {
-                    return true;
-                  }
-                }
-                return false;
-              }
+          return new ValueMatcher()
+          {
+            @Override
+            public boolean matches()
+            {
+              updateCurrentValues();
 
-              @Override
-              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-              {
-                inspector.visit("row", supplier);
-                inspector.visit("predicate", predicate);
+              if (dimensionValues.isEmpty()) {
+                return matchNull;
               }
-            };
-          } else {
-            return new ValueMatcher()
-            {
-              @Override
-              public boolean matches()
-              {
-                final List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-                if (dimensionValues == null || dimensionValues.isEmpty()) {
-                  return matchNull;
-                }
 
-                for (String dimensionValue : dimensionValues) {
-                  if (predicate.apply(extractionFn.apply(NullHandling.emptyToNullIfNeeded(dimensionValue)))) {
-                    return true;
-                  }
+              for (String dimensionValue : dimensionValues) {
+                if (predicate.apply(NullHandling.emptyToNullIfNeeded(dimensionValue))) {
+                  return true;
                 }
-                return false;
               }
+              return false;
+            }
 
-              @Override
-              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-              {
-                inspector.visit("row", supplier);
-                inspector.visit("predicate", predicate);
-              }
-            };
-          }
+            @Override
+            public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+            {
+              inspector.visit("row", rowSupplier);
+              inspector.visit("predicate", predicate);
+              inspector.visit("extractionFn", extractionFn);
+            }
+          };
         }
 
         @Override
@@ -298,10 +287,8 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public String lookupName(int id)
         {
-          final String value = NullHandling.emptyToNullIfNeeded(
-              Rows.objectToStrings(dimFunction.apply(supplier.get())).get(id)
-          );
-          return extractionFn == null ? value : extractionFn.apply(value);
+          updateCurrentValues();
+          return NullHandling.emptyToNullIfNeeded(dimensionValues.get(id));
         }
 
         @Override
@@ -321,10 +308,8 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public Object getObject()
         {
-          List<String> dimensionValues = Rows.objectToStrings(dimFunction.apply(supplier.get()));
-          if (dimensionValues == null) {
-            return null;
-          }
+          updateCurrentValues();
+
           if (dimensionValues.size() == 1) {
             return dimensionValues.get(0);
           }
@@ -340,9 +325,67 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public void inspectRuntimeShape(RuntimeShapeInspector inspector)
         {
-          inspector.visit("row", supplier);
+          inspector.visit("row", rowSupplier);
           inspector.visit("extractionFn", extractionFn);
         }
+
+        private void updateCurrentValues()
+        {
+          if (rowIdSupplier == null || rowIdSupplier.getAsLong() != currentId) {
+            try {
+              final Object rawValue = dimFunction.apply(rowSupplier.get());
+
+              if (rawValue == null || rawValue instanceof String) {
+                final String s = NullHandling.emptyToNullIfNeeded((String) rawValue);
+
+                if (extractionFn == null) {
+                  dimensionValues = Collections.singletonList(s);
+                } else {
+                  dimensionValues = Collections.singletonList(extractionFn.apply(s));
+                }
+              } else if (rawValue instanceof List) {
+                // Consistent behavior with Rows.objectToStrings, but applies extractionFn too.
+                //noinspection rawtypes
+                final List<String> values = new ArrayList<>(((List) rawValue).size());
+
+                //noinspection rawtypes
+                for (final Object item : ((List) rawValue)) {
+                  // Behavior with null item is to convert it to string "null". This is not what most other areas of Druid
+                  // would do when treating a null as a string, but it's consistent with Rows.objectToStrings, which is
+                  // commonly used when retrieving strings from input-row-like objects.
+                  if (extractionFn == null) {
+                    values.add(String.valueOf(item));
+                  } else {
+                    values.add(extractionFn.apply(String.valueOf(item)));
+                  }
+                }
+
+                dimensionValues = values;
+              } else {
+                final List<String> nonExtractedValues = Rows.objectToStrings(rawValue);
+                dimensionValues = new ArrayList<>(nonExtractedValues.size());
+
+                for (final String value : nonExtractedValues) {
+                  final String s = NullHandling.emptyToNullIfNeeded(value);
+
+                  if (extractionFn == null) {
+                    dimensionValues.add(s);
+                  } else {
+                    dimensionValues.add(extractionFn.apply(s));
+                  }
+                }
+              }
+            }
+            catch (Throwable e) {
+              currentId = NO_ID;
+              throw e;
+            }
+
+            if (rowIdSupplier != null) {
+              currentId = rowIdSupplier.getAsLong();
+            }
+          }
+        }
       };
     }
   }
@@ -358,7 +401,7 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public long getLong()
         {
-          return timestampFunction.applyAsLong(supplier.get());
+          return timestampFunction.applyAsLong(rowSupplier.get());
         }
 
         @Override
@@ -371,7 +414,7 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public void inspectRuntimeShape(RuntimeShapeInspector inspector)
         {
-          inspector.visit("row", supplier);
+          inspector.visit("row", rowSupplier);
         }
       }
       return new TimeLongColumnSelector();
@@ -426,13 +469,13 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
         @Override
         public void inspectRuntimeShape(RuntimeShapeInspector inspector)
         {
-          inspector.visit("row", supplier);
+          inspector.visit("row", rowSupplier);
         }
 
         @Nullable
         private Object getCurrentValue()
         {
-          return columnFunction.apply(supplier.get());
+          return columnFunction.apply(rowSupplier.get());
         }
 
         @Nullable
@@ -452,6 +495,6 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
   @Override
   public ColumnCapabilities getColumnCapabilities(String columnName)
   {
-    return getColumnCapabilities(rowSignatureSupplier.get(), columnName);
+    return getColumnCapabilities(columnInspectorSupplier.get(), columnName);
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java b/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java
index 863b725..3f83c09 100644
--- a/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java
+++ b/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java
@@ -46,6 +46,8 @@ public class RowBasedCursor<RowType> implements Cursor
   private final ColumnSelectorFactory columnSelectorFactory;
   private final ValueMatcher valueMatcher;
 
+  private long rowId = 0;
+
   RowBasedCursor(
       final RowWalker<RowType> rowWalker,
       final RowAdapter<RowType> rowAdapter,
@@ -63,9 +65,10 @@ public class RowBasedCursor<RowType> implements Cursor
     this.descending = descending;
     this.cursorTime = gran.toDateTime(interval.getStartMillis());
     this.columnSelectorFactory = virtualColumns.wrap(
-        RowBasedColumnSelectorFactory.create(
-            rowAdapter,
+        new RowBasedColumnSelectorFactory<>(
             rowWalker::currentRow,
+            () -> rowId,
+            rowAdapter,
             () -> rowSignature,
             false
         )
@@ -104,6 +107,7 @@ public class RowBasedCursor<RowType> implements Cursor
   public void advanceUninterruptibly()
   {
     rowWalker.advance();
+    rowId++;
     advanceToMatchingRow();
   }
 
@@ -122,6 +126,7 @@ public class RowBasedCursor<RowType> implements Cursor
   @Override
   public void reset()
   {
+    rowId = 0;
     rowWalker.reset();
     rowWalker.skipToDateTime(descending ? interval.getEnd().minus(1) : interval.getStart(), descending);
     advanceToMatchingRow();
@@ -131,6 +136,7 @@ public class RowBasedCursor<RowType> implements Cursor
   {
     while (!isDone() && !valueMatcher.matches()) {
       rowWalker.advance();
+      rowId++;
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
index b65bd8e..92b292a 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java
@@ -156,7 +156,7 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
    * Create a no frills, simple column with {@link ValueType} set and everything else false
    * @param valueType
    */
-  public static ColumnCapabilitiesImpl createSimpleNumericColumnCapabilities(ColumnType valueType)
+  public static ColumnCapabilitiesImpl createSimpleNumericColumnCapabilities(TypeSignature<ValueType> valueType)
   {
     ColumnCapabilitiesImpl builder = new ColumnCapabilitiesImpl().setType(valueType)
                                                                  .setHasMultipleValues(false)
@@ -191,7 +191,7 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
    * and {@link #hasNulls} is not set
    * @param valueType
    */
-  public static ColumnCapabilitiesImpl createSimpleArrayColumnCapabilities(ColumnType valueType)
+  public static ColumnCapabilitiesImpl createSimpleArrayColumnCapabilities(TypeSignature<ValueType> valueType)
   {
     ColumnCapabilitiesImpl builder = new ColumnCapabilitiesImpl().setType(valueType)
                                                                  .setHasMultipleValues(true)
@@ -246,8 +246,14 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
     return elementType;
   }
 
+  @JsonProperty
   public ColumnCapabilitiesImpl setType(ColumnType type)
   {
+    return setType((TypeSignature<ValueType>) type);
+  }
+
+  public ColumnCapabilitiesImpl setType(TypeSignature<ValueType> type)
+  {
     Preconditions.checkNotNull(type, "'type' must be nonnull");
     this.type = type.getType();
     this.complexTypeName = type.getComplexTypeName();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org