You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fe...@apache.org on 2020/05/09 12:23:55 UTC

[calcite] 01/02: [CALCITE-3737] HOP Table Function (Rui Wang)

This is an automated email from the ASF dual-hosted git repository.

fengzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit 40e588de5f999034e5030b12cdbc90f4073808fe
Author: amaliujia <am...@163.com>
AuthorDate: Mon Dec 23 23:48:25 2019 -0800

    [CALCITE-3737] HOP Table Function (Rui Wang)
---
 core/src/main/codegen/config.fmpp                  |  1 +
 core/src/main/codegen/templates/Parser.jj          | 16 +++-
 .../calcite/adapter/enumerable/EnumUtils.java      | 85 +++++++++++++++++++++-
 .../enumerable/EnumerableTableFunctionScan.java    |  6 +-
 .../calcite/adapter/enumerable/RexImpTable.java    | 36 +++++++--
 .../adapter/enumerable/RexToLixTranslator.java     |  8 +-
 ...ntor.java => TableFunctionCallImplementor.java} |  2 +-
 .../apache/calcite/sql/SqlHopTableFunction.java    | 71 ++++++++++++++++++
 .../apache/calcite/sql/SqlTumbleTableFunction.java | 65 +++++++++++++++++
 .../apache/calcite/sql/SqlWindowTableFunction.java | 49 +++----------
 .../calcite/sql/fun/SqlStdOperatorTable.java       | 10 ++-
 .../org/apache/calcite/util/BuiltInMethod.java     |  5 +-
 .../org/apache/calcite/test/SqlValidatorTest.java  | 35 ++++++++-
 .../apache/calcite/test/SqlToRelConverterTest.xml  |  6 +-
 core/src/test/resources/sql/stream.iq              | 38 ++++++++++
 site/_docs/reference.md                            | 17 +++++
 16 files changed, 384 insertions(+), 66 deletions(-)

diff --git a/core/src/main/codegen/config.fmpp b/core/src/main/codegen/config.fmpp
index eb9adff..ecf6dcf 100644
--- a/core/src/main/codegen/config.fmpp
+++ b/core/src/main/codegen/config.fmpp
@@ -145,6 +145,7 @@ data: {
       "GOTO"
       "GRANTED"
       "HIERARCHY"
+      "HOP"
       "HOURS"
       "IGNORE"
       "IMMEDIATE"
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index 3ea3a5a..b0c1395 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -6052,10 +6052,17 @@ SqlCall GroupByWindowingCall():
     final List<SqlNode> args;
 }
 {
-    <TUMBLE> { s = span(); }
-    args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
-        return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
-    }
+    (
+        <TUMBLE> { s = span(); }
+        args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+            return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
+        }
+    |
+        <HOP> { s = span(); }
+        args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+            return SqlStdOperatorTable.HOP.createCall(s.end(this), args);
+        }
+    )
 }
 
 SqlCall MatchRecognizeFunctionCall() :
@@ -6956,6 +6963,7 @@ SqlPostfixOperator PostfixRowOperator() :
 |   < HAVING: "HAVING" >
 |   < HIERARCHY: "HIERARCHY" >
 |   < HOLD: "HOLD" >
+|   < HOP: "HOP" >
 |   < HOUR: "HOUR" >
 |   < HOURS: "HOURS" >
 |   < IDENTITY: "IDENTITY" >
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index 6fca91d..b50a832 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -17,6 +17,9 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.JoinType;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.function.Function1;
@@ -56,6 +59,7 @@ import java.math.BigDecimal;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -746,7 +750,7 @@ public class EnumUtils {
 
   /** Generates a window selector which appends attribute of the window based on
    * the parameters. */
-  static Expression windowSelector(
+  static Expression tumblingWindowSelector(
       PhysType inputPhysType,
       PhysType outputPhysType,
       Expression wmColExpr,
@@ -796,4 +800,83 @@ public class EnumUtils {
         outputPhysType.record(expressions),
         parameter);
   }
+
+  /**
+   * Create enumerable implementation that applies hopping on each element from the input
+   * enumerator and produces at least one element for each input element.
+   */
+  public static Enumerable<Object[]> hopping(Enumerator<Object[]> inputEnumerator,
+      int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+    return new AbstractEnumerable<Object[]>() {
+      @Override public Enumerator<Object[]> enumerator() {
+        return new HopEnumerator(inputEnumerator,
+            indexOfWatermarkedColumn, emitFrequency, intervalSize);
+      }
+    };
+  }
+
+  private static class HopEnumerator implements Enumerator<Object[]> {
+    private final Enumerator<Object[]> inputEnumerator;
+    private final int indexOfWatermarkedColumn;
+    private final long emitFrequency;
+    private final long intervalSize;
+    private LinkedList<Object[]> list;
+
+    HopEnumerator(Enumerator<Object[]> inputEnumerator,
+        int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+      this.inputEnumerator = inputEnumerator;
+      this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+      this.emitFrequency = emitFrequency;
+      this.intervalSize = intervalSize;
+      list = new LinkedList<>();
+    }
+
+    public Object[] current() {
+      if (list.size() > 0) {
+        return takeOne();
+      } else {
+        Object[] current = inputEnumerator.current();
+        List<Pair> windows = hopWindows(SqlFunctions.toLong(current[indexOfWatermarkedColumn]),
+            emitFrequency, intervalSize);
+        for (Pair window : windows) {
+          Object[] curWithWindow = new Object[current.length + 2];
+          System.arraycopy(current, 0, curWithWindow, 0, current.length);
+          curWithWindow[current.length] = window.left;
+          curWithWindow[current.length + 1] = window.right;
+          list.offer(curWithWindow);
+        }
+        return takeOne();
+      }
+    }
+
+    public boolean moveNext() {
+      if (list.size() > 0) {
+        return true;
+      }
+      return inputEnumerator.moveNext();
+    }
+
+    public void reset() {
+      inputEnumerator.reset();
+      list.clear();
+    }
+
+    public void close() {
+    }
+
+    private Object[] takeOne() {
+      return list.pollFirst();
+    }
+  }
+
+  private static List<Pair> hopWindows(long tsMillis, long periodMillis, long sizeMillis) {
+    ArrayList<Pair> ret = new ArrayList<>(Math.toIntExact(sizeMillis / periodMillis));
+    long lastStart = tsMillis - ((tsMillis + periodMillis) % periodMillis);
+    for (long start = lastStart;
+         start > tsMillis - sizeMillis;
+         start -= periodMillis) {
+      ret.add(new Pair(start, start + sizeMillis));
+    }
+    return ret;
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
index 677b1dd..bd3c7ff 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
@@ -69,7 +69,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
     if (isImplementorDefined((RexCall) getCall())) {
       return tvfImplementorBasedImplement(implementor, pref);
     } else {
-      return defaultTableValuedFunctionImplement(implementor, pref);
+      return defaultTableFunctionImplement(implementor, pref);
     }
   }
 
@@ -100,7 +100,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
     return QueryableTable.class.isAssignableFrom(method.getReturnType());
   }
 
-  private Result defaultTableValuedFunctionImplement(
+  private Result defaultTableFunctionImplement(
       EnumerableRelImplementor implementor, Prefer pref) {
     BlockBuilder bb = new BlockBuilder();
     // Non-array user-specified types are not supported yet
@@ -142,7 +142,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
             SqlConformanceEnum.DEFAULT);
 
     builder.add(
-        RexToLixTranslator.translateTableValuedFunction(
+        RexToLixTranslator.translateTableFunction(
             typeFactory,
             conformance,
             builder,
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index e8c5995..77a366f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -184,6 +184,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING_ID;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP_TVF;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INITCAP;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INTERSECTION;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_A_SET;
@@ -312,7 +313,7 @@ public class RexImpTable {
       new HashMap<>();
   private final Map<SqlMatchFunction, Supplier<? extends MatchImplementor>> matchMap =
       new HashMap<>();
-  private final Map<SqlOperator, Supplier<? extends TableValuedFunctionCallImplementor>>
+  private final Map<SqlOperator, Supplier<? extends TableFunctionCallImplementor>>
       tvfImplementorMap = new HashMap<>();
 
   RexImpTable() {
@@ -673,6 +674,7 @@ public class RexImpTable {
     matchMap.put(LAST, LastImplementor::new);
     map.put(PREV, new PrevImplementor());
     tvfImplementorMap.put(TUMBLE_TVF, TumbleImplementor::new);
+    tvfImplementorMap.put(HOP_TVF, HopImplementor::new);
   }
 
   private <T> Supplier<T> constructorSupplier(Class<T> klass) {
@@ -969,8 +971,8 @@ public class RexImpTable {
     }
   }
 
-  public TableValuedFunctionCallImplementor get(final SqlWindowTableFunction operator) {
-    final Supplier<? extends TableValuedFunctionCallImplementor> supplier =
+  public TableFunctionCallImplementor get(final SqlWindowTableFunction operator) {
+    final Supplier<? extends TableFunctionCallImplementor> supplier =
         tvfImplementorMap.get(operator);
     if (supplier != null) {
       return supplier.get();
@@ -3205,7 +3207,7 @@ public class RexImpTable {
   }
 
   /** Implements tumbling. */
-  private static class TumbleImplementor implements TableValuedFunctionCallImplementor {
+  private static class TumbleImplementor implements TableFunctionCallImplementor {
     @Override public Expression implement(RexToLixTranslator translator,
         Expression inputEnumerable,
         RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
@@ -3225,11 +3227,35 @@ public class RexImpTable {
       return Expressions.call(
           BuiltInMethod.TUMBLING.method,
           inputEnumerable,
-          EnumUtils.windowSelector(
+          EnumUtils.tumblingWindowSelector(
               inputPhysType,
               outputPhysType,
               translatedOperands.get(0),
               translatedOperands.get(1)));
     }
   }
+
+  /** Implements hopping. */
+  private static class HopImplementor implements TableFunctionCallImplementor {
+    @Override public Expression implement(RexToLixTranslator translator,
+        Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
+      Expression intervalExpression = translator.translate(call.getOperands().get(2));
+      Expression intervalExpression2 = translator.translate(call.getOperands().get(3));
+      RexCall descriptor = (RexCall) call.getOperands().get(1);
+      List<Expression> translatedOperands = new ArrayList<>();
+      Expression wmColIndexExpr =
+          Expressions.constant(((RexInputRef) descriptor.getOperands().get(0)).getIndex());
+      translatedOperands.add(wmColIndexExpr);
+      translatedOperands.add(intervalExpression);
+      translatedOperands.add(intervalExpression2);
+
+      return Expressions.call(
+          BuiltInMethod.HOPPING.method,
+          Expressions.list(
+              Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
+              wmColIndexExpr,
+              intervalExpression,
+              intervalExpression2));
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
index 7bb3123..934f982 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
@@ -168,14 +168,14 @@ public class RexToLixTranslator {
         .translateList(program.getProjectList(), storageTypes);
   }
 
-  public static Expression translateTableValuedFunction(JavaTypeFactory typeFactory,
+  public static Expression translateTableFunction(JavaTypeFactory typeFactory,
       SqlConformance conformance, BlockBuilder blockBuilder,
       Expression root, RexCall rexCall, Expression inputEnumerable,
       PhysType inputPhysType, PhysType outputPhysType) {
     return new RexToLixTranslator(null, typeFactory, root, null,
         blockBuilder, Collections.emptyMap(), new RexBuilder(typeFactory), conformance,
         null, null)
-        .translateTableValuedFunction(rexCall, inputEnumerable, inputPhysType, outputPhysType);
+        .translateTableFunction(rexCall, inputEnumerable, inputPhysType, outputPhysType);
   }
 
   /** Creates a translator for translating aggregate functions. */
@@ -946,10 +946,10 @@ public class RexToLixTranslator {
     return list;
   }
 
-  private Expression translateTableValuedFunction(RexCall rexCall, Expression inputEnumerable,
+  private Expression translateTableFunction(RexCall rexCall, Expression inputEnumerable,
       PhysType inputPhysType, PhysType outputPhysType) {
     assert rexCall.getOperator() instanceof SqlWindowTableFunction;
-    TableValuedFunctionCallImplementor implementor =
+    TableFunctionCallImplementor implementor =
         RexImpTable.INSTANCE.get((SqlWindowTableFunction) rexCall.getOperator());
     if (implementor == null) {
       throw Util.needToImplement("implementor of " + rexCall.getOperator().getName());
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
similarity index 96%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
index 5a2d6a4..5492f06 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rex.RexCall;
  * Implements a table-valued function call.
  */
 @Experimental
-public interface TableValuedFunctionCallImplementor {
+public interface TableFunctionCallImplementor {
   /**
    * Implements a table-valued function call.
    *
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
new file mode 100644
index 0000000..d32b909
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
+ * 1. a table.
+ * 2. a descriptor to provide a watermarked column name from the input table.
+ * 3. an interval parameter to specify the length of window shifting.
+ * 4. an interval parameter to specify the length of window size.
+ */
+public class SqlHopTableFunction extends SqlWindowTableFunction {
+  public SqlHopTableFunction() {
+    super(SqlKind.HOP.name());
+  }
+
+  @Override public SqlOperandCountRange getOperandCountRange() {
+    return SqlOperandCountRanges.of(4);
+  }
+
+  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+      boolean throwOnFailure) {
+    // There should only be three operands, and number of operands are checked before
+    // this call.
+    final SqlNode operand0 = callBinding.operand(0);
+    final SqlValidator validator = callBinding.getValidator();
+    final RelDataType type = validator.getValidatedNodeType(operand0);
+    if (type.getSqlTypeName() != SqlTypeName.ROW) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    final SqlNode operand1 = callBinding.operand(1);
+    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
+    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
+    if (!SqlTypeUtil.isInterval(type2)) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
+    if (!SqlTypeUtil.isInterval(type3)) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    return true;
+  }
+
+  @Override public String getAllowedSignatures(String opNameToUse) {
+    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
+        + "datetime interval, datetime interval)";
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
new file mode 100644
index 0000000..e3a5001
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * SqlTumbleTableFunction implements an operator for tumbling. It allows three parameters:
+ * 1. a table.
+ * 2. a descriptor to provide a watermarked column name from the input table.
+ * 3. an interval parameter to specify the length of window size.
+ */
+public class SqlTumbleTableFunction extends SqlWindowTableFunction {
+  public SqlTumbleTableFunction() {
+    super(SqlKind.TUMBLE.name());
+  }
+
+  @Override public SqlOperandCountRange getOperandCountRange() {
+    return SqlOperandCountRanges.of(3);
+  }
+
+  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+      boolean throwOnFailure) {
+    // There should only be three operands, and number of operands are checked before
+    // this call.
+    final SqlNode operand0 = callBinding.operand(0);
+    final SqlValidator validator = callBinding.getValidator();
+    final RelDataType type = validator.getValidatedNodeType(operand0);
+    if (type.getSqlTypeName() != SqlTypeName.ROW) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    final SqlNode operand1 = callBinding.operand(1);
+    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
+    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
+    if (!SqlTypeUtil.isInterval(type2)) {
+      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    }
+    return true;
+  }
+
+  @Override public String getAllowedSignatures(String opNameToUse) {
+    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval)";
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index f72a260..e86551b 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -20,10 +20,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlValidator;
 
 import java.util.ArrayList;
@@ -44,28 +42,21 @@ public class SqlWindowTableFunction extends SqlFunction {
         SqlFunctionCategory.SYSTEM);
   }
 
-  @Override public SqlOperandCountRange getOperandCountRange() {
-    return SqlOperandCountRanges.of(3);
-  }
-
-  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+  protected boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
       boolean throwOnFailure) {
-    // There should only be three operands, and number of operands are checked before
-    // this call.
-    final SqlNode operand0 = callBinding.operand(0);
-    final SqlValidator validator = callBinding.getValidator();
-    final RelDataType type = validator.getValidatedNodeType(operand0);
-    if (type.getSqlTypeName() != SqlTypeName.ROW) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
-    }
-    final SqlNode operand1 = callBinding.operand(1);
-    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+    if (throwOnFailure) {
+      throw callBinding.newValidationSignatureError();
+    } else {
+      return false;
     }
-    for (SqlNode descOperand: ((SqlCall) operand1).getOperandList()) {
+  }
+
+  protected void validateColumnNames(SqlValidator validator,
+      List<String> fieldNames, List<SqlNode> unvalidatedColumnNames) {
+    for (SqlNode descOperand: unvalidatedColumnNames) {
       final String colName = ((SqlIdentifier) descOperand).getSimple();
       boolean matches = false;
-      for (String field : type.getFieldNames()) {
+      for (String field : fieldNames) {
         if (validator.getCatalogReader().nameMatcher().matches(field, colName)) {
           matches = true;
           break;
@@ -76,24 +67,6 @@ public class SqlWindowTableFunction extends SqlFunction {
             RESOURCE.unknownIdentifier(colName));
       }
     }
-    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
-    if (!SqlTypeUtil.isInterval(type2)) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
-    }
-    return true;
-  }
-
-  private boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    if (throwOnFailure) {
-      throw callBinding.newValidationSignatureError();
-    } else {
-      return false;
-    }
-  }
-
-  @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval)";
   }
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 23fc9c3..526df03 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.SqlFilterOperator;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlGroupedWindowFunction;
+import org.apache.calcite.sql.SqlHopTableFunction;
 import org.apache.calcite.sql.SqlInternalOperator;
 import org.apache.calcite.sql.SqlJsonConstructorNullClause;
 import org.apache.calcite.sql.SqlKind;
@@ -47,11 +48,11 @@ import org.apache.calcite.sql.SqlSampleSpec;
 import org.apache.calcite.sql.SqlSetOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlTumbleTableFunction;
 import org.apache.calcite.sql.SqlUnnestOperator;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlValuesOperator;
 import org.apache.calcite.sql.SqlWindow;
-import org.apache.calcite.sql.SqlWindowTableFunction;
 import org.apache.calcite.sql.SqlWithinGroupOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.type.InferTypes;
@@ -2294,7 +2295,10 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   public static final SqlOperator DESCRIPTOR = new SqlDescriptorOperator();
 
   /** TUMBLE as a table-value function. */
-  public static final SqlFunction TUMBLE_TVF = new SqlWindowTableFunction(SqlKind.TUMBLE.name());
+  public static final SqlFunction TUMBLE_TVF = new SqlTumbleTableFunction();
+
+  /** HOP as a table-value function. */
+  public static final SqlFunction HOP_TVF = new SqlHopTableFunction();
 
   /** The {@code TUMBLE} group function.
    *
@@ -2332,7 +2336,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
 
   /** The {@code HOP} group function. */
   public static final SqlGroupedWindowFunction HOP =
-      new SqlGroupedWindowFunction(SqlKind.HOP.name(), SqlKind.HOP, null,
+      new SqlGroupedWindowFunction("$HOP", SqlKind.HOP, null,
           ReturnTypes.ARG0, null,
           OperandTypes.or(OperandTypes.DATETIME_INTERVAL_INTERVAL,
               OperandTypes.DATETIME_INTERVAL_INTERVAL_TIME),
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 8bfdee5..fc60dcc 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.enumerable.AggregateLambdaFactory;
 import org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory;
 import org.apache.calcite.adapter.enumerable.BasicLazyAccumulator;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
 import org.apache.calcite.adapter.enumerable.LazyAggregateLambdaFactory;
 import org.apache.calcite.adapter.enumerable.MatchUtils;
 import org.apache.calcite.adapter.enumerable.SourceSorter;
@@ -591,7 +592,9 @@ public enum BuiltInMethod {
       "resultSelector", Function2.class),
   AGG_LAMBDA_FACTORY_ACC_SINGLE_GROUP_RESULT_SELECTOR(AggregateLambdaFactory.class,
       "singleGroupResultSelector", Function1.class),
-  TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class);
+  TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class),
+  HOPPING(EnumUtils.class, "hopping", Enumerator.class, int.class, long.class,
+      long.class);
 
   public final Method method;
   public final Constructor constructor;
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index f361759..241898e 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10214,7 +10214,7 @@ class SqlValidatorTest extends SqlValidatorTestCase {
         .fails("Unknown identifier 'COLUMN_NOT_EXIST'");
   }
 
-  @Test void testTumbleTableValuedFunction() {
+  @Test public void testTumbleTableFunction() {
     sql("select * from table(\n"
         + "^tumble(table orders, descriptor(rowtime), interval '2' hour, 'test')^)")
         .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
@@ -10237,7 +10237,36 @@ class SqlValidatorTest extends SqlValidatorTestCase {
         .fails("Object 'TABLER_NOT_EXIST' not found");
   }
 
-  @Test void testStreamTumble() {
+  @Test public void testHopTableFunction() {
+    sql("select * from table(\n"
+        + "hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour))").ok();
+    sql("select * from table(\n"
+        + "^hop(table orders, descriptor(rowtime), interval '2' hour)^)")
+        .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
+    sql("select * from table(\n"
+        + "^hop(table orders, descriptor(rowtime), interval '2' hour, 'test')^)")
+        .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+            + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
+            + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+    sql("select * from table(\n"
+        + "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
+        .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+            + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+            + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+    sql("select * from table(\n"
+        + "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
+        .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+            + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
+            + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+    sql("select * from table(\n"
+        + "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
+        .fails("Object 'TABLER_NOT_EXIST' not found");
+  }
+
+  @Test public void testStreamTumble() {
     // TUMBLE
     sql("select stream tumble_end(rowtime, interval '2' hour) as rowtime\n"
         + "from orders\n"
@@ -10297,7 +10326,7 @@ class SqlValidatorTest extends SqlValidatorTestCase {
         + "from orders\n"
         + "group by hop(rowtime, interval '1' hour, interval '3' hour)")
         .fails("Call to auxiliary group function 'HOP_START' must have "
-            + "matching call to group function 'HOP' in GROUP BY clause");
+            + "matching call to group function '\\$HOP' in GROUP BY clause");
     // HOP with align
     sql("select stream\n"
         + "  hop_start(rowtime, interval '1' hour, interval '3' hour,\n"
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 880eefe..c19e32a 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -3537,7 +3537,7 @@ group by hop(rowtime, interval '1' hour, interval '3' hour)]]>
 LogicalDelta
   LogicalProject(ROWTIME=[$0], C=[$1])
     LogicalAggregate(group=[{0}], C=[COUNT()])
-      LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 10800000:INTERVAL HOUR)])
+      LogicalProject($f0=[$HOP($0, 3600000:INTERVAL HOUR, 10800000:INTERVAL HOUR)])
         LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
 ]]>
         </Resource>
@@ -4917,7 +4917,7 @@ LogicalDelta
 ]]>
         </Resource>
     </TestCase>
-    <TestCase name="testTableValuedFunctionTumble">
+    <TestCase name="testTableFunctionTumble">
         <Resource name="sql">
             <![CDATA[select *
 from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE))]]>
@@ -4931,7 +4931,7 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
-    <TestCase name="testTableValuedFunctionTumbleWithSubQueryParam">
+    <TestCase name="testTableFunctionTumbleWithSubQueryParam">
         <Resource name="sql">
             <![CDATA[select *
 from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))]]>
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 18ac66e..15f5832 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -44,3 +44,41 @@ SELECT * FROM TABLE(TUMBLE((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL
 (5 rows)
 
 !ok
+
+SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
+
+SELECT * FROM TABLE(HOP((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index a223aa7..f8aa8ec 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -580,6 +580,7 @@ GRANTED,
 **HAVING**,
 HIERARCHY,
 **HOLD**,
+HOP,
 **HOUR**,
 HOURS,
 **IDENTITY**,
@@ -1882,7 +1883,23 @@ Here is an example:
 will apply tumbling with 1 minute window size on rows from table orders. rowtime is the
 watermarked column of table orders that tells data completeness.
 
+#### HOP
+In streaming queries, HOP assigns windows that cover rows within the interval of *size*, shifting every *slide*, 
+and optionally aligned at *time* based on a timestamp column. Windows assigned could have overlapping so hopping 
+sometime is named as "sliding windowing".  
+
+
+| Operator syntax      | Description
+|:-------------------- |:-----------
+| HOP(table, DESCRIPTOR(column_name), slide, size, [, time ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*, and optionally aligned at *time*. Hopping is applied on table in which there is a watermarked column specified by descriptor.
+
+Here is an example:
+`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
+will apply hopping with 5-minute interval size on rows from table orders, shifting every 2 minutes. rowtime is the
+watermarked column of table orders that tells data completeness.
+
 ### Grouped window functions
+**warning**: grouped window functions are deprecated.
 
 Grouped window functions occur in the `GROUP BY` clause and define a key value
 that represents a window containing several rows.