You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/03/30 09:36:42 UTC

[GitHub] [cassandra] blerer commented on a change in pull request #1510: CASSANDRA-11871: Allow to aggregate by time intervals

blerer commented on a change in pull request #1510:
URL: https://github.com/apache/cassandra/pull/1510#discussion_r835266657



##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -202,7 +231,281 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
 
                 return ByteBufferUtil.bytes(type.toTimeInMillis(bb));
             }
+
+            @Override
+            public boolean isMonotonic()
+            {
+                return true;
+            }
         };
     }
-}
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     private static abstract class FloorFunction extends NativeScalarFunction
+     {
+         private static final Long ZERO = Long.valueOf(0);
+
+         protected FloorFunction(AbstractType<?> returnType,
+                                 AbstractType<?>... argsType)
+         {
+             super("floor", returnType, argsType);
+         }
+
+         @Override
+         protected boolean isPartialApplicationMonotonic(List<ByteBuffer> partialParameters)
+         {
+             return partialParameters.get(0) == UNRESOLVED
+                     && partialParameters.get(1) != UNRESOLVED
+                     && (partialParameters.size() == 2 || partialParameters.get(2) != UNRESOLVED);
+         }
+
+         public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+         {
+             ByteBuffer timeBuffer = parameters.get(0);
+             ByteBuffer durationBuffer = parameters.get(1);
+             Long startingTime = getStartingTime(parameters);
+
+             if (timeBuffer == null || durationBuffer == null || startingTime == null)
+                 return null;
+
+             Long time = toTimeInMillis(timeBuffer);
+             Duration duration = DurationType.instance.compose(durationBuffer);
+
+             validateDuration(duration);
+
+             if (time == null || duration == null)

Review comment:
       Good catch :-)

##########
File path: src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
##########
@@ -162,4 +295,96 @@ public String toString()
                                .append(")")
                                .toString();
     }
+
+    @Override
+    protected int serializedSize(int version)
+    {
+        boolean isPartial = fun instanceof PartialScalarFunction;
+        Function function = isPartial ? ((PartialScalarFunction) fun).getFunction() : fun;
+
+        FunctionName name = function.name();
+        int size =  TypeSizes.sizeof(name.keyspace) + TypeSizes.sizeof(name.name);
+
+        List<AbstractType<?>> argTypes = function.argTypes();
+        size += TypeSizes.sizeofUnsignedVInt(argTypes.size());
+        for (int i = 0, m = argTypes.size(); i < m; i++)
+        {
+            size += sizeOf(argTypes.get(i));
+        }
+
+        size += TypeSizes.sizeof(isPartial);
+
+        if (isPartial)
+        {
+            List<ByteBuffer> partialParameters = ((PartialScalarFunction) fun).getPartialParameters();
+
+            // We use a bitset to track the position of the unresolved arguments
+            size += TypeSizes.sizeofUnsignedVInt(computeBitSet(partialParameters));
+
+            for (int i = 0, m = partialParameters.size(); i < m; i++)
+            {
+                ByteBuffer buffer = partialParameters.get(i);
+                if (buffer != Function.UNRESOLVED)
+                    size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
+            }
+        }
+
+        int numberOfRemainingArguments = argSelectors.size();
+        size += TypeSizes.sizeofUnsignedVInt(numberOfRemainingArguments);
+        for (int i = 0; i < numberOfRemainingArguments; i++)
+            size += serializer.serializedSize(argSelectors.get(i), version);
+
+        return size;
+    }
+
+    @Override
+    protected void serialize(DataOutputPlus out, int version) throws IOException
+    {
+        boolean isPartial = fun instanceof PartialScalarFunction;
+        Function function = isPartial ? ((PartialScalarFunction) fun).getFunction() : fun;
+
+        FunctionName name = function.name();
+        out.writeUTF(name.keyspace);
+        out.writeUTF(name.name);
+
+        List<AbstractType<?>> argTypes = function.argTypes();
+        int numberOfArguments = argTypes.size();
+        out.writeUnsignedVInt(numberOfArguments);
+
+        for (int i = 0; i < numberOfArguments; i++)
+            writeType(out, argTypes.get(i));
+
+        out.writeBoolean(isPartial);
+
+        if (isPartial)
+        {
+            List<ByteBuffer> partialParameters = ((PartialScalarFunction) fun).getPartialParameters();
+
+            // We use a bitset to track the position of the unresolved arguments
+            out.writeUnsignedVInt(computeBitSet(partialParameters));
+
+            for (int i = 0, m = partialParameters.size(); i < m; i++)
+            {
+                ByteBuffer buffer = partialParameters.get(i);
+                if (buffer != Function.UNRESOLVED)
+                    ByteBufferUtil.writeWithVIntLength(buffer, out);
+            }
+        }
+
+        int numberOfRemainingArguments = argSelectors.size();
+        out.writeUnsignedVInt(numberOfRemainingArguments);
+        for (int i = 0; i < numberOfRemainingArguments; i++)
+            serializer.serialize(argSelectors.get(i), out, version);
+    }
+
+    private int computeBitSet(List<ByteBuffer> partialParameters)
+    {
+        int bitset = 0;

Review comment:
       Good catch. I do not imagine a good use case for that. A simple assert is probably fine.

##########
File path: src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
##########
@@ -153,6 +264,28 @@ protected void setArg(int i, ByteBuffer value) throws InvalidRequestException
         return fun.returnType();
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof AbstractFunctionSelector))
+            return false;
+
+        AbstractFunctionSelector<?> s = (AbstractFunctionSelector<?>) o;
+
+        return Objects.equal(fun.name(), s.fun.name())
+            && Objects.equal(fun.argTypes(), s.fun.argTypes())

Review comment:
       The `equal` implementation is in `AbstractFunction` and takes into account the return type which the selector does not know.

##########
File path: src/java/org/apache/cassandra/cql3/functions/Function.java
##########
@@ -29,6 +29,15 @@
 @Unmetered
 public interface Function extends AssignmentTestable
 {
+    /**
+     * A marker buffer used to represent function parameters that cannot be resolved at some stage of CQL processing.
+     * This is used for partial function application in particular.
+     *
+     * @see ScalarFunction#partialApplication(ProtocolVersion, List)

Review comment:
       I removed the reference as it can change anyway.

##########
File path: src/java/org/apache/cassandra/cql3/functions/Function.java
##########
@@ -29,6 +29,15 @@
 @Unmetered
 public interface Function extends AssignmentTestable
 {
+    /**
+     * A marker buffer used to represent function parameters that cannot be resolved at some stage of CQL processing.
+     * This is used for partial function application in particular.
+     *
+     * @see ScalarFunction#partialApplication(ProtocolVersion, List)
+     * @see Selector.Factory#maybeResolve

Review comment:
       Removed

##########
File path: src/java/org/apache/cassandra/cql3/Duration.java
##########
@@ -395,6 +409,113 @@ private static long append(StringBuilder builder, long dividend, long divisor, S
         return dividend % divisor;
     }
 
+    /**
+     * Rounds a timestamp down to the closest multiple of a duration.
+     *
+     * @param timeInMillis the time to round in millisecond
+     * @param duration the duration
+     * @param startingTimeInMillis the time offset in milliseconds
+     * @return the timestamp rounded down to the closest multiple of the duration
+     */
+    public static long floorTimestamp(long timeInMillis, Duration duration, long startingTimeInMillis)
+    {
+        checkFalse(startingTimeInMillis > timeInMillis, "The floor function starting time is greater than the provided time");
+        checkFalse(duration.isNegative(), "Negative durations are not supported by the floor function");
+
+        // If the duration does not contains any months we can ignore daylight saving,
+        // as time zones are not supported, and simply look at the milliseconds
+        if (duration.months == 0)
+        {
+            // We can ignore daylight saving as time zones are not supported
+            long durationInMillis = (duration.days * MILLIS_PER_DAY) + (duration.nanoseconds / NANOS_PER_MILLI);
+
+            // If the duration is smaller than millisecond
+            if (durationInMillis == 0)
+                return timeInMillis;
+
+            long delta = (timeInMillis - startingTimeInMillis) % durationInMillis;
+            return timeInMillis - delta;
+        }
+
+        /*
+         * Otherwise, we resort to Calendar for the computation.
+         * What we're trying to compute is the largest integer 'multiplier' value such that
+         *   startingTimeMillis + (multiplier * duration) <= timeInMillis
+         * at which point we want to return 'startingTimeMillis + (multiplier * duration)'.
+         *
+         * One option would be to add 'duration' to 'statingTimeMillis' in a loop until we
+         * cross 'timeInMillis' and return how many iterator we did. But this might be slow if there is very many
+         * steps.
+         *
+         * So instead we first estimate 'multiplier' using the number of months between 'startingTimeMillis'
+         * and 'timeInMillis' ('durationInMonths' below) and the duration months. As the real computation
+         * should also take the 'days' and 'nanoseconds' parts of the duration, this multiplier may overshoot,
+         * so we detect it and work back from that, decreasing the multiplier until we find the proper one.
+         */
+
+        Calendar calendar = CALENDAR_PROVIDER.get();
+
+        calendar.setTimeInMillis(timeInMillis);
+        int year = calendar.get(Calendar.YEAR);
+        int month = calendar.get(Calendar.MONTH);
+
+        calendar.setTimeInMillis(startingTimeInMillis);
+        int startingYear = calendar.get(Calendar.YEAR);
+        int startingMonth = calendar.get(Calendar.MONTH);
+
+        int durationInMonths = (year - startingYear) * MONTHS_PER_YEAR + (month - startingMonth);
+        int multiplier = durationInMonths / duration.months;
+
+        calendar.add(Calendar.MONTH, multiplier * duration.months);
+
+        // If the duration was only containing months, we are done.
+        if (duration.days == 0 && duration.nanoseconds == 0)
+            return calendar.getTimeInMillis();
+
+        long durationInMillis = (duration.days * MILLIS_PER_DAY) + (duration.nanoseconds / NANOS_PER_MILLI);

Review comment:
       It is not calculated twice within the same path. As the computation involve a division, I wanted to avoid that cost until we reach the point where we need it.
   Does it make sense to you? 

##########
File path: src/java/org/apache/cassandra/cql3/functions/Function.java
##########
@@ -29,6 +29,15 @@
 @Unmetered
 public interface Function extends AssignmentTestable
 {
+    /**
+     * A marker buffer used to represent function parameters that cannot be resolved at some stage of CQL processing.
+     * This is used for partial function application in particular.
+     *
+     * @see ScalarFunction#partialApplication(ProtocolVersion, List)
+     * @see Selector.Factory#maybeResolve
+     */
+    public static final ByteBuffer UNRESOLVED = ByteBuffer.allocate(0);

Review comment:
       I have some plan to use it with aggregates.

##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -202,7 +231,281 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
 
                 return ByteBufferUtil.bytes(type.toTimeInMillis(bb));
             }
+
+            @Override
+            public boolean isMonotonic()
+            {
+                return true;
+            }
         };
     }
-}
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     private static abstract class FloorFunction extends NativeScalarFunction
+     {
+         private static final Long ZERO = Long.valueOf(0);
+
+         protected FloorFunction(AbstractType<?> returnType,
+                                 AbstractType<?>... argsType)
+         {
+             super("floor", returnType, argsType);
+         }
+
+         @Override
+         protected boolean isPartialApplicationMonotonic(List<ByteBuffer> partialParameters)
+         {
+             return partialParameters.get(0) == UNRESOLVED
+                     && partialParameters.get(1) != UNRESOLVED
+                     && (partialParameters.size() == 2 || partialParameters.get(2) != UNRESOLVED);
+         }
+
+         public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+         {
+             ByteBuffer timeBuffer = parameters.get(0);
+             ByteBuffer durationBuffer = parameters.get(1);
+             Long startingTime = getStartingTime(parameters);
+
+             if (timeBuffer == null || durationBuffer == null || startingTime == null)
+                 return null;
+
+             Long time = toTimeInMillis(timeBuffer);
+             Duration duration = DurationType.instance.compose(durationBuffer);
+
+             validateDuration(duration);
+
+             if (time == null || duration == null)
+                 return null;
+
+             long floor = Duration.floorTimestamp(time, duration, startingTime);
+
+             return fromTimeInMillis(floor);
+         }
+
+         /**
+          * Returns the time to use as the starting time.
+          *
+          * @param parameters the function parameters
+          * @return the time to use as the starting time
+          */
+         private Long getStartingTime(List<ByteBuffer> parameters)
+         {
+             if (parameters.size() == 3)
+             {
+                 ByteBuffer startingTimeBuffer = parameters.get(2);
+
+                 if (startingTimeBuffer == null)
+                     return null;
+
+                 return toStartingTimeInMillis(startingTimeBuffer);
+             }
+
+             return ZERO;
+         }
+
+         /**
+          * Validates that the duration has the correct precision.
+          * @param duration the duration to validate.
+          */
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data bellow milliseconds. We can do that by checking that the last
+             // 6 bits of the number of nanoseconds are all zeros. The compiler will replace the call to
+             // numberOfTrailingZeros by a TZCNT instruction.
+             if (Long.numberOfTrailingZeros(duration.getNanoseconds()) < 6)
+                 throw invalidRequest("The floor cannot be computed for the %s duration as precision is below 1 millisecond", duration);
+         }
+
+         /**
+          * Serializes the specified time.
+          *
+          * @param timeInMillis the time in milliseconds
+          * @return the serialized time
+          */
+         protected abstract ByteBuffer fromTimeInMillis(long timeInMillis);
+
+         /**
+          * Deserializes the specified input time.
+          *
+          * @param bytes the serialized time
+          * @return the time in milliseconds
+          */
+         protected abstract Long toTimeInMillis(ByteBuffer bytes);
+
+         /**
+          * Deserializes the specified starting time.
+          *
+          * @param bytes the serialized starting time
+          * @return the starting time in milliseconds
+          */
+         protected abstract Long toStartingTimeInMillis(ByteBuffer bytes);
+     };
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     public static final class FloorTimestampFunction extends FloorFunction
+     {
+         public static FloorTimestampFunction newInstance()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance);
+         }
+
+         public static FloorTimestampFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance,
+                                               TimestampType.instance);
+         }
+
+         private FloorTimestampFunction(AbstractType<?> returnType,
+                                        AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+     };
+
+     /**
+      * Function that rounds a timeUUID down to the closest multiple of a duration.
+      */
+     public static final class FloorTimeUuidFunction extends FloorFunction
+     {
+         public static FloorTimeUuidFunction newInstance()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance);
+         }
+
+         public static FloorTimeUuidFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance,
+                                              TimestampType.instance);
+         }
+
+         private FloorTimeUuidFunction(AbstractType<?> returnType,
+                                       AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return UUIDGen.getAdjustedTimestamp(UUIDGen.getUUID(bytes));
+         }
+     };
+
+     /**
+      * Function that rounds a date down to the closest multiple of a duration.
+      */
+     public static final class FloorDateFunction extends FloorFunction
+     {
+         public static FloorDateFunction newInstance()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance);
+         }
+
+         public static FloorDateFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance,
+                                          SimpleDateType.instance);
+         }
+
+         private FloorDateFunction(AbstractType<?> returnType,
+                                   AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return SimpleDateType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         @Override
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data below days.
+             if (duration.getNanoseconds() != 0)
+                 throw invalidRequest("The floor on %s values cannot be computed for the %s duration as precision is below 1 day",
+                                      SimpleDateType.instance.asCQL3Type(), duration);
+         }
+     };
+
+     /**
+      * Function that rounds a time down to the closest multiple of a duration.
+      */
+     public static final NativeScalarFunction floorTime = new NativeScalarFunction("floor", TimeType.instance, TimeType.instance, DurationType.instance)

Review comment:
       I agree there are some similarities. Now there are also some differences. It feels simpler to me like that but it might only be my personal taste.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org