You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/03/23 23:52:18 UTC

[GitHub] [cassandra] yifan-c commented on a change in pull request #1510: CASSANDRA-11871: Allow to aggregate by time intervals

yifan-c commented on a change in pull request #1510:
URL: https://github.com/apache/cassandra/pull/1510#discussion_r833773541



##########
File path: src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
##########
@@ -162,4 +295,96 @@ public String toString()
                                .append(")")
                                .toString();
     }
+
+    @Override
+    protected int serializedSize(int version)
+    {
+        boolean isPartial = fun instanceof PartialScalarFunction;
+        Function function = isPartial ? ((PartialScalarFunction) fun).getFunction() : fun;
+
+        FunctionName name = function.name();
+        int size =  TypeSizes.sizeof(name.keyspace) + TypeSizes.sizeof(name.name);
+
+        List<AbstractType<?>> argTypes = function.argTypes();
+        size += TypeSizes.sizeofUnsignedVInt(argTypes.size());
+        for (int i = 0, m = argTypes.size(); i < m; i++)
+        {
+            size += sizeOf(argTypes.get(i));
+        }
+
+        size += TypeSizes.sizeof(isPartial);
+
+        if (isPartial)
+        {
+            List<ByteBuffer> partialParameters = ((PartialScalarFunction) fun).getPartialParameters();
+
+            // We use a bitset to track the position of the unresolved arguments
+            size += TypeSizes.sizeofUnsignedVInt(computeBitSet(partialParameters));
+
+            for (int i = 0, m = partialParameters.size(); i < m; i++)
+            {
+                ByteBuffer buffer = partialParameters.get(i);
+                if (buffer != Function.UNRESOLVED)
+                    size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
+            }
+        }
+
+        int numberOfRemainingArguments = argSelectors.size();
+        size += TypeSizes.sizeofUnsignedVInt(numberOfRemainingArguments);
+        for (int i = 0; i < numberOfRemainingArguments; i++)
+            size += serializer.serializedSize(argSelectors.get(i), version);
+
+        return size;
+    }
+
+    @Override
+    protected void serialize(DataOutputPlus out, int version) throws IOException
+    {
+        boolean isPartial = fun instanceof PartialScalarFunction;
+        Function function = isPartial ? ((PartialScalarFunction) fun).getFunction() : fun;
+
+        FunctionName name = function.name();
+        out.writeUTF(name.keyspace);
+        out.writeUTF(name.name);
+
+        List<AbstractType<?>> argTypes = function.argTypes();
+        int numberOfArguments = argTypes.size();
+        out.writeUnsignedVInt(numberOfArguments);
+
+        for (int i = 0; i < numberOfArguments; i++)
+            writeType(out, argTypes.get(i));
+
+        out.writeBoolean(isPartial);
+
+        if (isPartial)
+        {
+            List<ByteBuffer> partialParameters = ((PartialScalarFunction) fun).getPartialParameters();
+
+            // We use a bitset to track the position of the unresolved arguments
+            out.writeUnsignedVInt(computeBitSet(partialParameters));
+
+            for (int i = 0, m = partialParameters.size(); i < m; i++)
+            {
+                ByteBuffer buffer = partialParameters.get(i);
+                if (buffer != Function.UNRESOLVED)
+                    ByteBufferUtil.writeWithVIntLength(buffer, out);
+            }
+        }
+
+        int numberOfRemainingArguments = argSelectors.size();
+        out.writeUnsignedVInt(numberOfRemainingArguments);
+        for (int i = 0; i < numberOfRemainingArguments; i++)
+            serializer.serialize(argSelectors.get(i), out, version);
+    }
+
+    private int computeBitSet(List<ByteBuffer> partialParameters)
+    {
+        int bitset = 0;

Review comment:
       nit: add an assertion on the parameters list size. It assumes the size is <= 32 for the bitset to work properly. Would there be a scenario to have more than 32 parameters?

##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -202,7 +231,281 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
 
                 return ByteBufferUtil.bytes(type.toTimeInMillis(bb));
             }
+
+            @Override
+            public boolean isMonotonic()
+            {
+                return true;
+            }
         };
     }
-}
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     private static abstract class FloorFunction extends NativeScalarFunction
+     {
+         private static final Long ZERO = Long.valueOf(0);
+
+         protected FloorFunction(AbstractType<?> returnType,
+                                 AbstractType<?>... argsType)
+         {
+             super("floor", returnType, argsType);
+         }
+
+         @Override
+         protected boolean isPartialApplicationMonotonic(List<ByteBuffer> partialParameters)
+         {
+             return partialParameters.get(0) == UNRESOLVED
+                     && partialParameters.get(1) != UNRESOLVED
+                     && (partialParameters.size() == 2 || partialParameters.get(2) != UNRESOLVED);
+         }
+
+         public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+         {
+             ByteBuffer timeBuffer = parameters.get(0);
+             ByteBuffer durationBuffer = parameters.get(1);
+             Long startingTime = getStartingTime(parameters);
+
+             if (timeBuffer == null || durationBuffer == null || startingTime == null)
+                 return null;
+
+             Long time = toTimeInMillis(timeBuffer);
+             Duration duration = DurationType.instance.compose(durationBuffer);
+
+             validateDuration(duration);
+
+             if (time == null || duration == null)
+                 return null;
+
+             long floor = Duration.floorTimestamp(time, duration, startingTime);
+
+             return fromTimeInMillis(floor);
+         }
+
+         /**
+          * Returns the time to use as the starting time.
+          *
+          * @param parameters the function parameters
+          * @return the time to use as the starting time
+          */
+         private Long getStartingTime(List<ByteBuffer> parameters)
+         {
+             if (parameters.size() == 3)
+             {
+                 ByteBuffer startingTimeBuffer = parameters.get(2);
+
+                 if (startingTimeBuffer == null)
+                     return null;
+
+                 return toStartingTimeInMillis(startingTimeBuffer);
+             }
+
+             return ZERO;
+         }
+
+         /**
+          * Validates that the duration has the correct precision.
+          * @param duration the duration to validate.
+          */
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data bellow milliseconds. We can do that by checking that the last
+             // 6 bits of the number of nanoseconds are all zeros. The compiler will replace the call to
+             // numberOfTrailingZeros by a TZCNT instruction.
+             if (Long.numberOfTrailingZeros(duration.getNanoseconds()) < 6)
+                 throw invalidRequest("The floor cannot be computed for the %s duration as precision is below 1 millisecond", duration);
+         }
+
+         /**
+          * Serializes the specified time.
+          *
+          * @param timeInMillis the time in milliseconds
+          * @return the serialized time
+          */
+         protected abstract ByteBuffer fromTimeInMillis(long timeInMillis);
+
+         /**
+          * Deserializes the specified input time.
+          *
+          * @param bytes the serialized time
+          * @return the time in milliseconds
+          */
+         protected abstract Long toTimeInMillis(ByteBuffer bytes);
+
+         /**
+          * Deserializes the specified starting time.
+          *
+          * @param bytes the serialized starting time
+          * @return the starting time in milliseconds
+          */
+         protected abstract Long toStartingTimeInMillis(ByteBuffer bytes);
+     };
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     public static final class FloorTimestampFunction extends FloorFunction
+     {
+         public static FloorTimestampFunction newInstance()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance);
+         }
+
+         public static FloorTimestampFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance,
+                                               TimestampType.instance);
+         }
+
+         private FloorTimestampFunction(AbstractType<?> returnType,
+                                        AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+     };
+
+     /**
+      * Function that rounds a timeUUID down to the closest multiple of a duration.
+      */
+     public static final class FloorTimeUuidFunction extends FloorFunction
+     {
+         public static FloorTimeUuidFunction newInstance()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance);
+         }
+
+         public static FloorTimeUuidFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance,
+                                              TimestampType.instance);
+         }
+
+         private FloorTimeUuidFunction(AbstractType<?> returnType,
+                                       AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return UUIDGen.getAdjustedTimestamp(UUIDGen.getUUID(bytes));
+         }
+     };

Review comment:
       nit: unnecessary `;`

##########
File path: src/java/org/apache/cassandra/cql3/Duration.java
##########
@@ -395,6 +409,113 @@ private static long append(StringBuilder builder, long dividend, long divisor, S
         return dividend % divisor;
     }
 
+    /**
+     * Rounds a timestamp down to the closest multiple of a duration.
+     *
+     * @param timeInMillis the time to round in millisecond
+     * @param duration the duration
+     * @param startingTimeInMillis the time offset in milliseconds
+     * @return the timestamp rounded down to the closest multiple of the duration
+     */
+    public static long floorTimestamp(long timeInMillis, Duration duration, long startingTimeInMillis)
+    {
+        checkFalse(startingTimeInMillis > timeInMillis, "The floor function starting time is greater than the provided time");
+        checkFalse(duration.isNegative(), "Negative durations are not supported by the floor function");
+
+        // If the duration does not contains any months we can ignore daylight saving,
+        // as time zones are not supported, and simply look at the milliseconds
+        if (duration.months == 0)
+        {
+            // We can ignore daylight saving as time zones are not supported
+            long durationInMillis = (duration.days * MILLIS_PER_DAY) + (duration.nanoseconds / NANOS_PER_MILLI);
+
+            // If the duration is smaller than millisecond
+            if (durationInMillis == 0)
+                return timeInMillis;
+
+            long delta = (timeInMillis - startingTimeInMillis) % durationInMillis;
+            return timeInMillis - delta;
+        }
+
+        /*
+         * Otherwise, we resort to Calendar for the computation.
+         * What we're trying to compute is the largest integer 'multiplier' value such that
+         *   startingTimeMillis + (multiplier * duration) <= timeInMillis
+         * at which point we want to return 'startingTimeMillis + (multiplier * duration)'.
+         *
+         * One option would be to add 'duration' to 'statingTimeMillis' in a loop until we
+         * cross 'timeInMillis' and return how many iterator we did. But this might be slow if there is very many
+         * steps.
+         *
+         * So instead we first estimate 'multiplier' using the number of months between 'startingTimeMillis'
+         * and 'timeInMillis' ('durationInMonths' below) and the duration months. As the real computation
+         * should also take the 'days' and 'nanoseconds' parts of the duration, this multiplier may overshoot,
+         * so we detect it and work back from that, decreasing the multiplier until we find the proper one.
+         */
+
+        Calendar calendar = CALENDAR_PROVIDER.get();
+
+        calendar.setTimeInMillis(timeInMillis);
+        int year = calendar.get(Calendar.YEAR);
+        int month = calendar.get(Calendar.MONTH);
+
+        calendar.setTimeInMillis(startingTimeInMillis);
+        int startingYear = calendar.get(Calendar.YEAR);
+        int startingMonth = calendar.get(Calendar.MONTH);
+
+        int durationInMonths = (year - startingYear) * MONTHS_PER_YEAR + (month - startingMonth);
+        int multiplier = durationInMonths / duration.months;
+
+        calendar.add(Calendar.MONTH, multiplier * duration.months);
+
+        // If the duration was only containing months, we are done.
+        if (duration.days == 0 && duration.nanoseconds == 0)
+            return calendar.getTimeInMillis();
+
+        long durationInMillis = (duration.days * MILLIS_PER_DAY) + (duration.nanoseconds / NANOS_PER_MILLI);

Review comment:
       `(duration.days * MILLIS_PER_DAY) + (duration.nanoseconds / NANOS_PER_MILLI);` is calculated twice. Once here and another time at line#430. 
   It can be calculated before the `if`

##########
File path: test/unit/org/apache/cassandra/cql3/DurationTest.java
##########
@@ -174,7 +175,211 @@ private long toMillis(String timeAsString)
         return DateUtils.truncate(date, Calendar.SECOND).getTime();
     }
 
-    public void assertInvalidDuration(String duration, String expectedErrorMessage)
+    @Test
+    public void testInvalidFloorTimestamp()
+    {
+        try
+        {
+            floorTimestamp("2016-09-27T16:12:00", "2h", "2017-09-01T00:00:00");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            assertEquals("The floor function starting time is greater than the provided time", e.getMessage());
+        }
+
+        try
+        {
+            floorTimestamp("2016-09-27T16:12:00", "-2h", "2016-09-27T00:00:00");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            assertEquals("Negative durations are not supported by the floor function", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testFloorTimestamp()
+    {
+        //~ Test with a duration in hours //////////////////////////////////////////////
+
+        // Test floor with a timestamp equals to the start time
+        long result = floorTimestamp("2016-09-27T16:12:00", "2h", "2016-09-27T16:12:00");
+        assertEquals(toMillis("2016-09-27T16:12:00"), result);

Review comment:
       I found it is easier to read the assertion error in the form of human-readable format.
   for example, this is hard to correlate with the test. 
   ```
   java.lang.AssertionError: 
   Expected :1473469200000
   Actual   :1473494400000
   ```
   
   So instead of `toMillis`, how about convert a millis value to a date time string and just compare the string value? 

##########
File path: test/unit/org/apache/cassandra/cql3/DurationTest.java
##########
@@ -174,7 +175,211 @@ private long toMillis(String timeAsString)
         return DateUtils.truncate(date, Calendar.SECOND).getTime();
     }
 
-    public void assertInvalidDuration(String duration, String expectedErrorMessage)
+    @Test
+    public void testInvalidFloorTimestamp()
+    {
+        try
+        {
+            floorTimestamp("2016-09-27T16:12:00", "2h", "2017-09-01T00:00:00");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            assertEquals("The floor function starting time is greater than the provided time", e.getMessage());
+        }
+
+        try
+        {
+            floorTimestamp("2016-09-27T16:12:00", "-2h", "2016-09-27T00:00:00");
+            Assert.fail();
+        }
+        catch (InvalidRequestException e)
+        {
+            assertEquals("Negative durations are not supported by the floor function", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testFloorTimestamp()
+    {
+        //~ Test with a duration in hours //////////////////////////////////////////////
+
+        // Test floor with a timestamp equals to the start time
+        long result = floorTimestamp("2016-09-27T16:12:00", "2h", "2016-09-27T16:12:00");
+        assertEquals(toMillis("2016-09-27T16:12:00"), result);
+
+        // Test floor with a duration equals to zero
+        result = floorTimestamp("2016-09-27T18:12:00", "0h", "2016-09-27T16:12:00");
+        assertEquals(toMillis("2016-09-27T18:12:00"), result);
+
+        // Test floor with a timestamp exactly equals to the start time + (1 x duration)
+        result = floorTimestamp("2016-09-27T18:12:00", "2h", "2016-09-27T16:12:00");
+        assertEquals(toMillis("2016-09-27T18:12:00"), result);
+
+        // Test floor with a timestamp in the first bucket
+        result = floorTimestamp("2016-09-27T16:13:00", "2h", "2016-09-27T16:12:00");
+        assertEquals(toMillis("2016-09-27T16:12:00"), result);
+
+        // Test floor with a timestamp in another bucket
+        result = floorTimestamp("2016-09-27T16:12:00", "2h", "2016-09-27T00:00:00");
+        assertEquals(toMillis("2016-09-27T16:00:00"), result);
+
+
+        //~ Test with a duration in days //////////////////////////////////////////////
+
+        // Test floor with a start time at the beginning of the month
+        result = floorTimestamp("2016-09-27T16:12:00", "2d", "2016-09-01T00:00:00");
+        assertEquals(toMillis("2016-09-27T00:00:00"), result);
+
+        // Test floor with a start time in the previous month
+        result = floorTimestamp("2016-09-27T16:12:00", "2d", "2016-08-01T00:00:00");
+        assertEquals(toMillis("2016-09-26T00:00:00"), result);
+
+
+        //~ Test with a duration in days and hours ////////////////////////////////////
+
+        result = floorTimestamp("2016-09-27T16:12:00", "2d12h", "2016-09-01T00:00:00");
+        assertEquals(toMillis("2016-09-26T00:00:00"), result);
+
+
+        //~ Test with a duration in months ////////////////////////////////////////////
+
+        // Test floor with a timestamp equals to the start time
+        result = floorTimestamp("2016-09-01T00:00:00", "2mo", "2016-09-01T00:00:00");
+        assertEquals(toMillis("2016-09-01T00:00:00"), result);
+
+        // Test floor with a timestamp in the first bucket
+        result = floorTimestamp("2016-09-27T16:12:00", "2mo", "2016-09-01T00:00:00");
+        assertEquals(toMillis("2016-09-01T00:00:00"), result);
+
+        // Test floor with a start time at the beginning of the year (LEAP YEAR)
+        result = floorTimestamp("2016-09-27T16:12:00", "1mo", "2016-01-01T00:00:00");
+        assertEquals(toMillis("2016-09-01T00:00:00"), result);
+
+        // Test floor with a start time at the beginning of the previous year
+        result = floorTimestamp("2016-09-27T16:12:00", "2mo", "2015-01-01T00:00:00");
+        assertEquals(toMillis("2016-09-01T00:00:00"), result);
+
+        // Test floor with a start time in the previous year
+        result = floorTimestamp("2016-09-27T16:12:00", "2mo", "2015-02-02T00:00:00");
+        assertEquals(toMillis("2016-08-02T00:00:00"), result);
+
+
+        //~ Test with a duration in months and days ////////////////////////////////////
+
+        // Test floor with a start time at the beginning of the year (LEAP YEAR)
+        result = floorTimestamp("2016-09-27T16:12:00", "2mo2d", "2016-01-01T00:00:00");
+        assertEquals(toMillis("2016-09-09T00:00:00"), result);

Review comment:
       nit: feels interested to add a test case that has more than 2 fields in duration specified. e.g.
   ```
           result = floorTimestamp("2016-09-27T16:12:00", "2mo2d8h", "2016-01-01T00:00:00");
           assertEquals(toMillis("2016-09-10T08:00:00"), result);
   ```

##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -65,6 +76,12 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
             {
                 return type.now();
             }
+
+            @Override
+            public boolean isPure()
+            {
+                return false;

Review comment:
       nit: maybe add a comment explain why `now()` is not pure as it returns non-identical results for identical arguments. 

##########
File path: src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
##########
@@ -153,6 +264,28 @@ protected void setArg(int i, ByteBuffer value) throws InvalidRequestException
         return fun.returnType();
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof AbstractFunctionSelector))
+            return false;
+
+        AbstractFunctionSelector<?> s = (AbstractFunctionSelector<?>) o;
+
+        return Objects.equal(fun.name(), s.fun.name())
+            && Objects.equal(fun.argTypes(), s.fun.argTypes())

Review comment:
       Wondering why comparing selected 2 fields instead of `Objects.equal(fun, s.fun)`?

##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -202,7 +231,281 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
 
                 return ByteBufferUtil.bytes(type.toTimeInMillis(bb));
             }
+
+            @Override
+            public boolean isMonotonic()
+            {
+                return true;
+            }
         };
     }
-}
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     private static abstract class FloorFunction extends NativeScalarFunction
+     {
+         private static final Long ZERO = Long.valueOf(0);
+
+         protected FloorFunction(AbstractType<?> returnType,
+                                 AbstractType<?>... argsType)
+         {
+             super("floor", returnType, argsType);
+         }
+
+         @Override
+         protected boolean isPartialApplicationMonotonic(List<ByteBuffer> partialParameters)
+         {
+             return partialParameters.get(0) == UNRESOLVED
+                     && partialParameters.get(1) != UNRESOLVED
+                     && (partialParameters.size() == 2 || partialParameters.get(2) != UNRESOLVED);
+         }
+
+         public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+         {
+             ByteBuffer timeBuffer = parameters.get(0);
+             ByteBuffer durationBuffer = parameters.get(1);
+             Long startingTime = getStartingTime(parameters);
+
+             if (timeBuffer == null || durationBuffer == null || startingTime == null)
+                 return null;
+
+             Long time = toTimeInMillis(timeBuffer);
+             Duration duration = DurationType.instance.compose(durationBuffer);
+
+             validateDuration(duration);
+
+             if (time == null || duration == null)
+                 return null;
+
+             long floor = Duration.floorTimestamp(time, duration, startingTime);
+
+             return fromTimeInMillis(floor);
+         }
+
+         /**
+          * Returns the time to use as the starting time.
+          *
+          * @param parameters the function parameters
+          * @return the time to use as the starting time
+          */
+         private Long getStartingTime(List<ByteBuffer> parameters)
+         {
+             if (parameters.size() == 3)
+             {
+                 ByteBuffer startingTimeBuffer = parameters.get(2);
+
+                 if (startingTimeBuffer == null)
+                     return null;
+
+                 return toStartingTimeInMillis(startingTimeBuffer);
+             }
+
+             return ZERO;
+         }
+
+         /**
+          * Validates that the duration has the correct precision.
+          * @param duration the duration to validate.
+          */
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data bellow milliseconds. We can do that by checking that the last
+             // 6 bits of the number of nanoseconds are all zeros. The compiler will replace the call to
+             // numberOfTrailingZeros by a TZCNT instruction.
+             if (Long.numberOfTrailingZeros(duration.getNanoseconds()) < 6)
+                 throw invalidRequest("The floor cannot be computed for the %s duration as precision is below 1 millisecond", duration);
+         }
+
+         /**
+          * Serializes the specified time.
+          *
+          * @param timeInMillis the time in milliseconds
+          * @return the serialized time
+          */
+         protected abstract ByteBuffer fromTimeInMillis(long timeInMillis);
+
+         /**
+          * Deserializes the specified input time.
+          *
+          * @param bytes the serialized time
+          * @return the time in milliseconds
+          */
+         protected abstract Long toTimeInMillis(ByteBuffer bytes);
+
+         /**
+          * Deserializes the specified starting time.
+          *
+          * @param bytes the serialized starting time
+          * @return the starting time in milliseconds
+          */
+         protected abstract Long toStartingTimeInMillis(ByteBuffer bytes);
+     };
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     public static final class FloorTimestampFunction extends FloorFunction
+     {
+         public static FloorTimestampFunction newInstance()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance);
+         }
+
+         public static FloorTimestampFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance,
+                                               TimestampType.instance);
+         }
+
+         private FloorTimestampFunction(AbstractType<?> returnType,
+                                        AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+     };
+
+     /**
+      * Function that rounds a timeUUID down to the closest multiple of a duration.
+      */
+     public static final class FloorTimeUuidFunction extends FloorFunction
+     {
+         public static FloorTimeUuidFunction newInstance()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance);
+         }
+
+         public static FloorTimeUuidFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance,
+                                              TimestampType.instance);
+         }
+
+         private FloorTimeUuidFunction(AbstractType<?> returnType,
+                                       AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return UUIDGen.getAdjustedTimestamp(UUIDGen.getUUID(bytes));
+         }
+     };
+
+     /**
+      * Function that rounds a date down to the closest multiple of a duration.
+      */
+     public static final class FloorDateFunction extends FloorFunction
+     {
+         public static FloorDateFunction newInstance()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance);
+         }
+
+         public static FloorDateFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance,
+                                          SimpleDateType.instance);
+         }
+
+         private FloorDateFunction(AbstractType<?> returnType,
+                                   AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return SimpleDateType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         @Override
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data below days.
+             if (duration.getNanoseconds() != 0)
+                 throw invalidRequest("The floor on %s values cannot be computed for the %s duration as precision is below 1 day",
+                                      SimpleDateType.instance.asCQL3Type(), duration);
+         }
+     };

Review comment:
       nit: unnecessary `;`

##########
File path: src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java
##########
@@ -30,6 +30,18 @@
 {
     public boolean isCalledOnNullInput();
 
+    /**
+     * Checks if the function is monotonic.
+     *
+     *<p>A function is monotonic if it is either entirely nonincreasing or nondecreasing.</p>

Review comment:
       nit: would it make sense to append ", given the ordered set of input"?

##########
File path: src/java/org/apache/cassandra/cql3/functions/TimeFcts.java
##########
@@ -202,7 +231,281 @@ public ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> para
 
                 return ByteBufferUtil.bytes(type.toTimeInMillis(bb));
             }
+
+            @Override
+            public boolean isMonotonic()
+            {
+                return true;
+            }
         };
     }
-}
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     private static abstract class FloorFunction extends NativeScalarFunction
+     {
+         private static final Long ZERO = Long.valueOf(0);
+
+         protected FloorFunction(AbstractType<?> returnType,
+                                 AbstractType<?>... argsType)
+         {
+             super("floor", returnType, argsType);
+         }
+
+         @Override
+         protected boolean isPartialApplicationMonotonic(List<ByteBuffer> partialParameters)
+         {
+             return partialParameters.get(0) == UNRESOLVED
+                     && partialParameters.get(1) != UNRESOLVED
+                     && (partialParameters.size() == 2 || partialParameters.get(2) != UNRESOLVED);
+         }
+
+         public final ByteBuffer execute(ProtocolVersion protocolVersion, List<ByteBuffer> parameters)
+         {
+             ByteBuffer timeBuffer = parameters.get(0);
+             ByteBuffer durationBuffer = parameters.get(1);
+             Long startingTime = getStartingTime(parameters);
+
+             if (timeBuffer == null || durationBuffer == null || startingTime == null)
+                 return null;
+
+             Long time = toTimeInMillis(timeBuffer);
+             Duration duration = DurationType.instance.compose(durationBuffer);
+
+             validateDuration(duration);
+
+             if (time == null || duration == null)
+                 return null;
+
+             long floor = Duration.floorTimestamp(time, duration, startingTime);
+
+             return fromTimeInMillis(floor);
+         }
+
+         /**
+          * Returns the time to use as the starting time.
+          *
+          * @param parameters the function parameters
+          * @return the time to use as the starting time
+          */
+         private Long getStartingTime(List<ByteBuffer> parameters)
+         {
+             if (parameters.size() == 3)
+             {
+                 ByteBuffer startingTimeBuffer = parameters.get(2);
+
+                 if (startingTimeBuffer == null)
+                     return null;
+
+                 return toStartingTimeInMillis(startingTimeBuffer);
+             }
+
+             return ZERO;
+         }
+
+         /**
+          * Validates that the duration has the correct precision.
+          * @param duration the duration to validate.
+          */
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data bellow milliseconds. We can do that by checking that the last
+             // 6 bits of the number of nanoseconds are all zeros. The compiler will replace the call to
+             // numberOfTrailingZeros by a TZCNT instruction.
+             if (Long.numberOfTrailingZeros(duration.getNanoseconds()) < 6)
+                 throw invalidRequest("The floor cannot be computed for the %s duration as precision is below 1 millisecond", duration);
+         }
+
+         /**
+          * Serializes the specified time.
+          *
+          * @param timeInMillis the time in milliseconds
+          * @return the serialized time
+          */
+         protected abstract ByteBuffer fromTimeInMillis(long timeInMillis);
+
+         /**
+          * Deserializes the specified input time.
+          *
+          * @param bytes the serialized time
+          * @return the time in milliseconds
+          */
+         protected abstract Long toTimeInMillis(ByteBuffer bytes);
+
+         /**
+          * Deserializes the specified starting time.
+          *
+          * @param bytes the serialized starting time
+          * @return the starting time in milliseconds
+          */
+         protected abstract Long toStartingTimeInMillis(ByteBuffer bytes);
+     };
+
+    /**
+     * Function that rounds a timestamp down to the closest multiple of a duration.
+     */
+     public static final class FloorTimestampFunction extends FloorFunction
+     {
+         public static FloorTimestampFunction newInstance()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance);
+         }
+
+         public static FloorTimestampFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimestampFunction(TimestampType.instance,
+                                               TimestampType.instance,
+                                               DurationType.instance,
+                                               TimestampType.instance);
+         }
+
+         private FloorTimestampFunction(AbstractType<?> returnType,
+                                        AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+     };
+
+     /**
+      * Function that rounds a timeUUID down to the closest multiple of a duration.
+      */
+     public static final class FloorTimeUuidFunction extends FloorFunction
+     {
+         public static FloorTimeUuidFunction newInstance()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance);
+         }
+
+         public static FloorTimeUuidFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorTimeUuidFunction(TimestampType.instance,
+                                              TimeUUIDType.instance,
+                                              DurationType.instance,
+                                              TimestampType.instance);
+         }
+
+         private FloorTimeUuidFunction(AbstractType<?> returnType,
+                                       AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return TimestampType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return TimestampType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return UUIDGen.getAdjustedTimestamp(UUIDGen.getUUID(bytes));
+         }
+     };
+
+     /**
+      * Function that rounds a date down to the closest multiple of a duration.
+      */
+     public static final class FloorDateFunction extends FloorFunction
+     {
+         public static FloorDateFunction newInstance()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance);
+         }
+
+         public static FloorDateFunction newInstanceWithStartTimeArgument()
+         {
+             return new FloorDateFunction(SimpleDateType.instance,
+                                          SimpleDateType.instance,
+                                          DurationType.instance,
+                                          SimpleDateType.instance);
+         }
+
+         private FloorDateFunction(AbstractType<?> returnType,
+                                   AbstractType<?>... argTypes)
+         {
+             super(returnType, argTypes);
+         }
+
+         protected ByteBuffer fromTimeInMillis(long timeInMillis)
+         {
+             return SimpleDateType.instance.fromTimeInMillis(timeInMillis);
+         }
+
+         protected Long toStartingTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         protected Long toTimeInMillis(ByteBuffer bytes)
+         {
+             return SimpleDateType.instance.toTimeInMillis(bytes);
+         }
+
+         @Override
+         protected void validateDuration(Duration duration)
+         {
+             // Checks that the duration has no data below days.
+             if (duration.getNanoseconds() != 0)
+                 throw invalidRequest("The floor on %s values cannot be computed for the %s duration as precision is below 1 day",
+                                      SimpleDateType.instance.asCQL3Type(), duration);
+         }
+     };
+
+     /**
+      * Function that rounds a time down to the closest multiple of a duration.
+      */
+     public static final NativeScalarFunction floorTime = new NativeScalarFunction("floor", TimeType.instance, TimeType.instance, DurationType.instance)

Review comment:
       This shares a lot of similarity with `FloorFunction`. Would it make sense to extend from `FloorFunction`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org