You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by da...@apache.org on 2020/08/18 08:53:36 UTC

[calcite] 01/01: [CALCITE-4171] Support named parameters for table window functions

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit e84f635020a6c3653b4f1456d64edb86efb2fea8
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Mon Aug 17 13:44:41 2020 +0800

    [CALCITE-4171] Support named parameters for table window functions
    
    * Changes SqlArgumentAssignmentOperator to allow non-scala query as operand
    * In SqlCallBinding, matches the permuted operand by name with name matcher
    * Refactor SqlWindowTableFunction and its sub-class to reuse same logic
    * Do not patch up the SqlWindowTableFunction with DEFAULTs when sql validation
---
 .../calcite/adapter/enumerable/EnumUtils.java      |   1 -
 .../apache/calcite/runtime/CalciteResource.java    |   4 +
 .../org/apache/calcite/sql/SqlCallBinding.java     |  45 +++++++--
 .../apache/calcite/sql/SqlHopTableFunction.java    |  88 ++++++++--------
 .../calcite/sql/SqlSessionTableFunction.java       |  78 ++++++++------
 .../apache/calcite/sql/SqlTumbleTableFunction.java |  82 ++++++++-------
 .../apache/calcite/sql/SqlWindowTableFunction.java |  77 +++++++++++++-
 .../sql/fun/SqlArgumentAssignmentOperator.java     |   4 +
 .../calcite/sql/validate/SqlValidatorImpl.java     |   8 +-
 .../calcite/runtime/CalciteResource.properties     |   1 +
 .../apache/calcite/test/SqlToRelConverterTest.java |  64 ++++++++++++
 .../org/apache/calcite/test/SqlValidatorTest.java  |  98 ++++++++++++++++--
 .../apache/calcite/test/SqlToRelConverterTest.xml  | 112 +++++++++++++++++++++
 core/src/test/resources/sql/stream.iq              |  61 +++++++++++
 site/_docs/reference.md                            |  63 ++++++++++--
 15 files changed, 646 insertions(+), 140 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index 87fd196..ae9455f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -804,7 +804,6 @@ public class EnumUtils {
       expressions.add(expression);
     }
     final Expression wmColExprToLong = EnumUtils.convert(wmColExpr, long.class);
-    final Expression shiftExpr = Expressions.constant(1, long.class);
 
     // Find the fixed window for a timestamp given a window size and an offset, and return the
     // window start.
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 7987546..3aaec91 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -220,6 +220,10 @@ public interface CalciteResource {
   @BaseMessage("Column ''{0}'' is ambiguous")
   ExInst<SqlValidatorException> columnAmbiguous(String a0);
 
+  @BaseMessage("Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?")
+  ExInst<SqlValidatorException> paramNotFoundInFunctionDidYouMean(String a0,
+      String a1, String a2);
+
   @BaseMessage("Operand {0} must be a query")
   ExInst<SqlValidatorException> needQueryOp(String a0);
 
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
index d944812..ea6dad8 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SelectScope;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.ImmutableNullableList;
 import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -161,17 +163,42 @@ public class SqlCallBinding extends SqlOperatorBinding {
    * formal parameters of the function. */
   private List<SqlNode> permutedOperands(final SqlCall call) {
     final SqlFunction operator = (SqlFunction) call.getOperator();
-    return Lists.transform(operator.getParamNames(), paramName -> {
-      for (SqlNode operand2 : call.getOperandList()) {
-        final SqlCall call2 = (SqlCall) operand2;
-        assert operand2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
-        final SqlIdentifier id = call2.operand(1);
-        if (id.getSimple().equals(paramName)) {
-          return call2.operand(0);
+    final List<String> paramNames = operator.getParamNames();
+    final List<SqlNode> permuted = new ArrayList<>();
+    final SqlNameMatcher nameMatcher = validator.getCatalogReader().nameMatcher();
+    for (final String paramName : paramNames) {
+      Pair<String, SqlIdentifier> args = null;
+      for (int j = 0; j < call.getOperandList().size(); j++) {
+        final SqlCall call2 = call.operand(j);
+        assert call2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
+        final SqlIdentifier operandID = call2.operand(1);
+        final String operandName = operandID.getSimple();
+        if (nameMatcher.matches(operandName, paramName)) {
+          permuted.add(call2.operand(0));
+          break;
+        } else if (args == null
+            && nameMatcher.isCaseSensitive()
+            && operandName.equalsIgnoreCase(paramName)) {
+          args = Pair.of(paramName, operandID);
+        }
+        // the last operand, there is still no match.
+        if (j == call.getOperandList().size() - 1) {
+          if (args != null) {
+            throw SqlUtil.newContextException(args.right.getParserPosition(),
+                RESOURCE.paramNotFoundInFunctionDidYouMean(args.right.getSimple(),
+                    call.getOperator().getName(), args.left));
+          }
+          if (!(operator instanceof SqlWindowTableFunction)) {
+            // Not like user defined functions, we do not patch up the operands
+            // with DEFAULT and then convert to nulls during sql-to-rel conversion.
+            // Thus, there is no need to show the optional operands in the plan and
+            // decide if the optional operand is null when code generation.
+            permuted.add(DEFAULT_CALL);
+          }
         }
       }
-      return DEFAULT_CALL;
-    });
+    }
+    return permuted;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index f5936ae..816140b 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -16,60 +16,68 @@
  */
 package org.apache.calcite.sql;
 
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
-import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 /**
- * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
- * 1. a table;
- * 2. a descriptor to provide a watermarked column name from the input table;
- * 3. an interval parameter to specify the length of window shifting;
- * 4. an interval parameter to specify the length of window size.
+ * SqlHopTableFunction implements an operator for hopping.
+ *
+ * <p>It allows four parameters:
+ *
+ * <ol>
+ *   <li>a table</li>
+ *   <li>a descriptor to provide a watermarked column name from the input table</li>
+ *   <li>an interval parameter to specify the length of window shifting</li>
+ *   <li>an interval parameter to specify the length of window size</li>
+ * </ol>
  */
 public class SqlHopTableFunction extends SqlWindowTableFunction {
   public SqlHopTableFunction() {
-    super(SqlKind.HOP.name());
+    super(SqlKind.HOP.name(), OperandTypeCheckerImpl.INSTANCE);
   }
 
-  @Override public SqlOperandCountRange getOperandCountRange() {
-    return SqlOperandCountRanges.between(4, 5);
+  @Override public List<String> getParamNames() {
+    return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SLIDE, PARAM_SIZE, PARAM_OFFSET);
   }
 
-  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    final SqlNode operand0 = callBinding.operand(0);
-    final SqlValidator validator = callBinding.getValidator();
-    final RelDataType type = validator.getValidatedNodeType(operand0);
-    if (type.getSqlTypeName() != SqlTypeName.ROW) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
-    }
-    final SqlNode operand1 = callBinding.operand(1);
-    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /** Operand type checker for HOP. */
+  private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+    static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+    @Override public boolean checkOperandTypes(
+        SqlCallBinding callBinding, boolean throwOnFailure) {
+      if (!validateTableWithFollowingDescriptors(callBinding, 1)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      if (!validateTailingIntervals(callBinding, 2)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      return true;
     }
-    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
-    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
-    if (!SqlTypeUtil.isInterval(type2)) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public SqlOperandCountRange getOperandCountRange() {
+      return SqlOperandCountRanges.between(4, 5);
     }
-    final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
-    if (!SqlTypeUtil.isInterval(type3)) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+      return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+          + "datetime interval, datetime interval[, datetime interval])";
     }
-    if (callBinding.getOperandCount() > 4) {
-      final RelDataType type4 = validator.getValidatedNodeType(callBinding.operand(4));
-      if (!SqlTypeUtil.isInterval(type4)) {
-        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
-      }
+
+    @Override public Consistency getConsistency() {
+      return Consistency.NONE;
     }
-    return true;
-  }
 
-  @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col), "
-        + "datetime interval, datetime interval[, datetime interval])";
+    @Override public boolean isOptional(int i) {
+      return i == 4;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index fef50cc..61a2ba6 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -18,54 +18,70 @@ package org.apache.calcite.sql;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlValidator;
 
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
 /**
  * SqlSessionTableFunction implements an operator for per-key sessionization. It allows
  * four parameters:
- * 1. a table.
- * 2. a descriptor to provide a watermarked column name from the input table.
- * 3. a descriptor to provide a column as key, on which sessionization will be applied.
- * 4. an interval parameter to specify a inactive activity gap to break sessions.
+ *
+ * <ol>
+ *   <li>table as data source</li>
+ *   <li>a descriptor to provide a watermarked column name from the input table</li>
+ *   <li>a descriptor to provide a column as key, on which sessionization will be applied</li>
+ *   <li>an interval parameter to specify a inactive activity gap to break sessions</li>
+ * </ol>
  */
 public class SqlSessionTableFunction extends SqlWindowTableFunction {
   public SqlSessionTableFunction() {
-    super(SqlKind.SESSION.name());
+    super(SqlKind.SESSION.name(), OperandTypeCheckerImpl.INSTANCE);
   }
 
-  @Override public SqlOperandCountRange getOperandCountRange() {
-    return SqlOperandCountRanges.of(4);
+  @Override public List<String> getParamNames() {
+    return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE);
   }
 
-  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    final SqlNode operand0 = callBinding.operand(0);
-    final SqlValidator validator = callBinding.getValidator();
-    final RelDataType type = validator.getValidatedNodeType(operand0);
-    if (type.getSqlTypeName() != SqlTypeName.ROW) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /** Operand type checker for SESSION. */
+  private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+    static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+    @Override public boolean checkOperandTypes(
+        SqlCallBinding callBinding, boolean throwOnFailure) {
+      final SqlValidator validator = callBinding.getValidator();
+      if (!validateTableWithFollowingDescriptors(callBinding, 2)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
+      if (!SqlTypeUtil.isInterval(type3)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      return true;
     }
-    final SqlNode operand1 = callBinding.operand(1);
-    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public SqlOperandCountRange getOperandCountRange() {
+      return SqlOperandCountRanges.of(4);
     }
-    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
-    final SqlNode operand2 = callBinding.operand(2);
-    if (operand2.getKind() != SqlKind.DESCRIPTOR) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+      return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+          + "DESCRIPTOR(key), datetime interval)";
     }
-    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList());
-    final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
-    if (!SqlTypeUtil.isInterval(type3)) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public Consistency getConsistency() {
+      return Consistency.NONE;
     }
-    return true;
-  }
 
-  @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col), "
-        + "DESCRIPTOR(col), datetime interval)";
+    @Override public boolean isOptional(int i) {
+      return false;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
index 75f746d..f32f13d 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -16,59 +16,73 @@
  */
 package org.apache.calcite.sql;
 
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
-import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 /**
  * SqlTumbleTableFunction implements an operator for tumbling.
  *
  * <p>It allows three parameters:
- * 1. a table;
- * 2. a descriptor to provide a watermarked column name from the input table;
- * 3. an interval parameter to specify the length of window size.
+ *
+ * <ol>
+ *   <li>a table</li>
+ *   <li>a descriptor to provide a watermarked column name from the input table</li>
+ *   <li>an interval parameter to specify the length of window size</li>
+ * </ol>
  */
 public class SqlTumbleTableFunction extends SqlWindowTableFunction {
   public SqlTumbleTableFunction() {
-    super(SqlKind.TUMBLE.name());
+    super(SqlKind.TUMBLE.name(), OperandTypeCheckerImpl.INSTANCE);
   }
 
   @Override public SqlOperandCountRange getOperandCountRange() {
     return SqlOperandCountRanges.between(3, 4);
   }
 
-  @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    // There should only be three operands, and number of operands are checked before
-    // this call.
-    final SqlNode operand0 = callBinding.operand(0);
-    final SqlValidator validator = callBinding.getValidator();
-    final RelDataType type = validator.getValidatedNodeType(operand0);
-    if (type.getSqlTypeName() != SqlTypeName.ROW) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+  @Override public List<String> getParamNames() {
+    return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE, PARAM_OFFSET);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /** Operand type checker for SESSION. */
+  private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+    static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+    @Override public boolean checkOperandTypes(
+        SqlCallBinding callBinding, boolean throwOnFailure) {
+      // There should only be three operands, and number of operands are checked before
+      // this call.
+      if (!validateTableWithFollowingDescriptors(callBinding, 1)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      if (!validateTailingIntervals(callBinding, 2)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+      }
+      return true;
     }
-    final SqlNode operand1 = callBinding.operand(1);
-    if (operand1.getKind() != SqlKind.DESCRIPTOR) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public SqlOperandCountRange getOperandCountRange() {
+      return SqlOperandCountRanges.of(4);
     }
-    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
-    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
-    if (!SqlTypeUtil.isInterval(type2)) {
-      return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+    @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+      return opName + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval"
+          + "[, datetime interval])";
     }
-    if (callBinding.getOperandCount() > 3) {
-      final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
-      if (!SqlTypeUtil.isInterval(type3)) {
-        return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
-      }
+
+    @Override public Consistency getConsistency() {
+      return Consistency.NONE;
     }
-    return true;
-  }
 
-  @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval"
-        + "[, datetime interval])";
+    @Override public boolean isOptional(int i) {
+      return i == 3;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index a1670c9..2b4dfd3 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -19,8 +19,10 @@ package org.apache.calcite.sql;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.calcite.sql.validate.SqlValidator;
 
@@ -34,6 +36,25 @@ import static org.apache.calcite.util.Static.RESOURCE;
  */
 public class SqlWindowTableFunction extends SqlFunction
     implements SqlTableFunction {
+
+  /** The data source which the table function computes with. */
+  protected static final String PARAM_DATA = "DATA";
+
+  /** The time attribute column. Also known as the event time. */
+  protected static final String PARAM_TIMECOL = "TIMECOL";
+
+  /** The window duration INTERVAL. */
+  protected static final String PARAM_SIZE = "SIZE";
+
+  /** The optional align offset for each window. */
+  protected static final String PARAM_OFFSET = "OFFSET";
+
+  /** The session key(s), only used for SESSION window. */
+  protected static final String PARAM_KEY = "KEY";
+
+  /** The slide interval, only used for HOP window. */
+  protected static final String PARAM_SLIDE = "SLIDE";
+
   /**
    * Type-inference strategy whereby the row type of a table function call is a
    * ROW, which is combined from the row type of operand #0 (which is a TABLE)
@@ -48,16 +69,16 @@ public class SqlWindowTableFunction extends SqlFunction
       SqlWindowTableFunction::inferRowType;
 
   /** Creates a window table function with a given name. */
-  public SqlWindowTableFunction(String name) {
-    super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null, null,
-        SqlFunctionCategory.SYSTEM);
+  public SqlWindowTableFunction(String name, SqlOperandTypeChecker operandTypeChecker) {
+    super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null,
+        operandTypeChecker, SqlFunctionCategory.SYSTEM);
   }
 
   @Override public SqlReturnTypeInference getRowTypeInference() {
     return ARG0_TABLE_FUNCTION_WINDOWING;
   }
 
-  protected boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
+  protected static boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
       boolean throwOnFailure) {
     if (throwOnFailure) {
       throw callBinding.newValidationSignatureError();
@@ -66,7 +87,53 @@ public class SqlWindowTableFunction extends SqlFunction
     }
   }
 
-  protected void validateColumnNames(SqlValidator validator,
+  /**
+   * Validate the heading operands are in the form:
+   * (ROW, DESCRIPTOR, DESCRIPTOR ..., other params).
+   *
+   * @param callBinding The call binding
+   * @param descriptors The number of descriptors following the first operand (e.g. the table)
+   *
+   * @return true if validation passes
+   */
+  protected static boolean validateTableWithFollowingDescriptors(
+      SqlCallBinding callBinding, int descriptors) {
+    final SqlNode operand0 = callBinding.operand(0);
+    final SqlValidator validator = callBinding.getValidator();
+    final RelDataType type = validator.getValidatedNodeType(operand0);
+    if (type.getSqlTypeName() != SqlTypeName.ROW) {
+      return false;
+    }
+    for (int i = 1; i < descriptors + 1; i++) {
+      final SqlNode operand = callBinding.operand(i);
+      if (operand.getKind() != SqlKind.DESCRIPTOR) {
+        return false;
+      }
+      validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand).getOperandList());
+    }
+    return true;
+  }
+
+  /**
+   * Validate the operands starting from position {@code startPos} are all INTERVAL.
+   *
+   * @param callBinding The call binding
+   * @param startPos    The start position to validate (starting index is 0)
+   *
+   * @return true if validation passes
+   */
+  protected static boolean validateTailingIntervals(SqlCallBinding callBinding, int startPos) {
+    final SqlValidator validator = callBinding.getValidator();
+    for (int i = startPos; i < callBinding.getOperandCount(); i++) {
+      final RelDataType type = validator.getValidatedNodeType(callBinding.operand(i));
+      if (!SqlTypeUtil.isInterval(type)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void validateColumnNames(SqlValidator validator,
       List<String> fieldNames, List<SqlNode> columnNames) {
     final SqlNameMatcher matcher = validator.getCatalogReader().nameMatcher();
     for (SqlNode columnName : columnNames) {
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
index f303eda..4d5e3f5 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
@@ -47,4 +47,8 @@ class SqlArgumentAssignmentOperator extends SqlAsOperator {
     writer.keyword(getName());
     call.operand(0).unparse(writer, getRightPrec(), rightPrec);
   }
+
+  @Override public boolean argumentMustBeScalar(int ordinal) {
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 19e0e21..9123f92 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -3318,7 +3318,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
    * @param clause    Name of clause: "WHERE", "GROUP BY", "ON"
    */
   private void validateNoAggs(AggFinder aggFinder, SqlNode node,
-                              String clause) {
+      String clause) {
     final SqlCall agg = aggFinder.findAgg(node);
     if (agg == null) {
       return;
@@ -3500,7 +3500,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
   }
 
   private void checkRollUp(SqlNode grandParent, SqlNode parent,
-                           SqlNode current, SqlValidatorScope scope, String optionalClause) {
+      SqlNode current, SqlValidatorScope scope, String optionalClause) {
     current = stripAs(current);
     if (current instanceof SqlCall && !(current instanceof SqlSelect)) {
       // Validate OVER separately
@@ -3525,7 +3525,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
   }
 
   private void checkRollUp(SqlNode grandParent, SqlNode parent,
-                           SqlNode current, SqlValidatorScope scope) {
+      SqlNode current, SqlValidatorScope scope) {
     checkRollUp(grandParent, parent, current, scope, null);
   }
 
@@ -3568,7 +3568,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
 
   // Returns true iff the given column is valid inside the given aggCall.
   private boolean isRolledUpColumnAllowedInAgg(SqlIdentifier identifier, SqlValidatorScope scope,
-                                               SqlCall aggCall, SqlNode parent) {
+      SqlCall aggCall, SqlNode parent) {
     Pair<String, String> pair = findTableColumnPair(identifier, scope);
 
     if (pair == null) {
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 3a42da3..5a91208 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -77,6 +77,7 @@ ColumnNotFoundDidYouMean=Column ''{0}'' not found in any table; did you mean ''{
 ColumnNotFoundInTable=Column ''{0}'' not found in table ''{1}''
 ColumnNotFoundInTableDidYouMean=Column ''{0}'' not found in table ''{1}''; did you mean ''{2}''?
 ColumnAmbiguous=Column ''{0}'' is ambiguous
+ParamNotFoundInFunctionDidYouMean = Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?
 NeedQueryOp=Operand {0} must be a query
 NeedSameTypeParameter=Parameters must be of the same type
 CanNotApplyOp2Type=Cannot apply ''{0}'' to arguments of type {1}. Supported form(s): {2}
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 4100557..44124a6 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1829,6 +1829,26 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
     sql(sql).ok();
   }
 
+  @Test void testTableFunctionTumbleWithParamNames() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "tumble(\n"
+        + "  DATA => table Shipments,\n"
+        + "  TIMECOL => descriptor(rowtime),\n"
+        + "  SIZE => INTERVAL '1' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test void testTableFunctionTumbleWithParamReordered() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "tumble(\n"
+        + "  DATA => table Shipments,\n"
+        + "  SIZE => INTERVAL '1' MINUTE,\n"
+        + "  TIMECOL => descriptor(rowtime)))";
+    sql(sql).ok();
+  }
+
   @Test void testTableFunctionTumbleWithInnerJoin() {
     final String sql = "select *\n"
         + "from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE)) a\n"
@@ -1858,6 +1878,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
     sql(sql).ok();
   }
 
+  @Test void testTableFunctionHopWithParamNames() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "hop(\n"
+        + "  DATA => table Shipments,\n"
+        + "  TIMECOL => descriptor(rowtime),\n"
+        + "  SLIDE => INTERVAL '1' MINUTE,\n"
+        + "  SIZE => INTERVAL '2' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test void testTableFunctionHopWithParamReordered() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "hop(\n"
+        + "  DATA => table Shipments,\n"
+        + "  SLIDE => INTERVAL '1' MINUTE,\n"
+        + "  TIMECOL => descriptor(rowtime),\n"
+        + "  SIZE => INTERVAL '2' MINUTE))";
+    sql(sql).ok();
+  }
+
   @Test void testTableFunctionSession() {
     final String sql = "select *\n"
         + "from table(session(table Shipments, descriptor(rowtime), "
@@ -1865,6 +1907,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
     sql(sql).ok();
   }
 
+  @Test void testTableFunctionSessionWithParamNames() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "session(\n"
+        + "  DATA => table Shipments,\n"
+        + "  TIMECOL => descriptor(rowtime),\n"
+        + "  KEY => descriptor(orderId),\n"
+        + "  SIZE => INTERVAL '10' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test void testTableFunctionSessionWithParamReordered() {
+    final String sql = "select *\n"
+        + "from table(\n"
+        + "session(\n"
+        + "  DATA => table Shipments,\n"
+        + "  KEY => descriptor(orderId),\n"
+        + "  TIMECOL => descriptor(rowtime),\n"
+        + "  SIZE => INTERVAL '10' MINUTE))";
+    sql(sql).ok();
+  }
+
   @Test void testTableFunctionTumbleWithSubQueryParam() {
     final String sql = "select *\n"
         + "from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))";
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 32cc678..bb6a82f 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10287,13 +10287,41 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   }
 
   @Test public void testTumbleTableFunction() {
-    sql("select * from table(\n"
-        + "^tumble(table orders, descriptor(rowtime))^)")
-        .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
     sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n"
         + "tumble(table orders, descriptor(rowtime), interval '2' hour))").ok();
     sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n"
         + "tumble(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour))").ok();
+    // test named params.
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "tumble(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "size => interval '2' hour))").ok();
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "tumble(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "size => interval '2' hour,\n"
+        + "\"OFFSET\" => interval '1' hour))").ok();
+    // negative tests.
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "tumble(\n"
+        + "^\"data\"^ => table orders,\n"
+        + "TIMECOL => descriptor(rowtime),\n"
+        + "SIZE => interval '2' hour))")
+        .fails("Param 'data' not found in function 'TUMBLE'; did you mean 'DATA'\\?");
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "^tumble(\n"
+        + "data => table orders,\n"
+        + "SIZE => interval '2' hour)^)")
+        .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
+    sql("select * from table(\n"
+        + "^tumble(table orders, descriptor(rowtime))^)")
+        .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
     sql("select * from table(\n"
         + "^tumble(table orders, descriptor(rowtime), 'test')^)")
         .fails("Cannot apply 'TUMBLE' to arguments of type 'TUMBLE\\(<RECORDTYPE\\"
@@ -10326,6 +10354,34 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     sql("select * from table(\n"
         + "hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, "
         + "interval '20' minute))").ok();
+    // test named params.
+    sql("select * from table(\n"
+        + "hop(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "slide => interval '2' hour,\n"
+        + "size => interval '1' hour))").ok();
+    sql("select * from table(\n"
+        + "hop(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "slide => interval '2' hour,\n"
+        + "size => interval '1' hour,\n"
+        + "\"OFFSET\" => interval '20' minute))").ok();
+    // negative tests.
+    sql("select * from table(\n"
+        + "hop(\n"
+        + "^\"data\"^ => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "slide => interval '2' hour,\n"
+        + "size => interval '1' hour))")
+        .fails("Param 'data' not found in function 'HOP'; did you mean 'DATA'\\?");
+    sql("select * from table(\n"
+        + "^hop(\n"
+        + "data => table orders,\n"
+        + "slide => interval '2' hour,\n"
+        + "size => interval '1' hour)^)")
+        .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
     sql("select * from table(\n"
         + "^hop(table orders, descriptor(rowtime), interval '2' hour)^)")
         .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
@@ -10334,25 +10390,25 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
             + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+            + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
     sql("select * from table(\n"
         + "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+            + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
     sql("select * from table(\n"
         + "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+            + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
     sql("select * from table(\n"
         + "^hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, 'test')^)")
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
             + "<INTERVAL HOUR>, <CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, "
-            + "DESCRIPTOR\\(col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+            + "DESCRIPTOR\\(timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
     sql("select * from table(\n"
         + "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
         .fails("Object 'TABLER_NOT_EXIST' not found");
@@ -10362,6 +10418,28 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     sql("select * from table(\n"
         + "session(table orders, descriptor(rowtime), descriptor(productid), interval '1' hour))")
         .ok();
+    // test named params.
+    sql("select * from table(\n"
+        + "session(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "key => descriptor(productid),\n"
+        + "size => interval '1' hour))")
+        .ok();
+    // negative tests.
+    sql("select * from table(\n"
+        + "session(\n"
+        + "^\"data\"^ => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "key => descriptor(productid),\n"
+        + "size => interval '1' hour))")
+        .fails("Param 'data' not found in function 'SESSION'; did you mean 'DATA'\\?");
+    sql("select * from table(\n"
+        + "^session(\n"
+        + "data => table orders,\n"
+        + "key => descriptor(productid),\n"
+        + "size => interval '1' hour)^)")
+        .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
     sql("select * from table(\n"
         + "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
         .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
@@ -10370,19 +10448,19 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <COLUMN_LIST>, "
             + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
     sql("select * from table(\n"
         + "^session(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
         .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
     sql("select * from table(\n"
         + "^session(table orders, 'test', descriptor(productid), interval '2' hour)^)")
         .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <COLUMN_LIST>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
-            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
     sql("select * from table(\n"
         + "session(TABLE ^tabler_not_exist^, descriptor(rowtime), descriptor(productid), interval '1' hour))")
         .fails("Object 'TABLER_NOT_EXIST' not found");
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 2758df7..69c57e2 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -5019,6 +5019,42 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testTableFunctionTumbleWithParamNames">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+tumble(
+  DATA => table Shipments,
+  TIMECOL => descriptor(rowtime),
+  SIZE => INTERVAL '1' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionTumbleWithParamReordered">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+tumble(
+  DATA => table Shipments,
+  SIZE => INTERVAL '1' MINUTE,
+  TIMECOL => descriptor(rowtime)))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testTableFunctionTumbleWithInnerJoin">
         <Resource name="sql">
             <![CDATA[select *
@@ -5067,6 +5103,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testTableFunctionHopWithParamNames">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+hop(
+  TIMECOL => descriptor(rowtime),
+  SLIDE => INTERVAL '1' MINUTE,
+  DATA => table Shipments,
+  SIZE => INTERVAL '2' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionHopWithParamReordered">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+hop(
+  DATA => table Shipments,
+  SLIDE => INTERVAL '1' MINUTE,
+  TIMECOL => descriptor(rowtime),
+  SIZE => INTERVAL '2' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testTableFunctionHopWithOffset">
         <Resource name="sql">
             <![CDATA[select *
@@ -5095,6 +5169,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testTableFunctionSessionWithParamNames">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+session(
+  DATA => table Shipments,
+  TIMECOL => descriptor(rowtime),
+  KEY => descriptor(orderId),
+  SIZE => INTERVAL '10' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionSessionWithParamReordered">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(
+session(
+  DATA => table Shipments,
+  KEY => descriptor(orderId),
+  TIMECOL => descriptor(rowtime),
+  SIZE => INTERVAL '10' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testTableFunctionTumbleWithSubQueryParam">
         <Resource name="sql">
             <![CDATA[select *
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 724eabd..365994e 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -17,6 +17,24 @@
 #
 !use orinoco
 !set outputformat mysql
+SELECT * FROM TABLE(
+  TUMBLE(
+    DATA => TABLE ORDERS,
+    TIMECOL => DESCRIPTOR(ROWTIME),
+    SIZE => INTERVAL '1' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:16:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 10:59:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:10:00 | 2015-02-15 11:11:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
 SELECT * FROM TABLE(TUMBLE(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '1' MINUTE));
 +---------------------+----+---------+-------+---------------------+---------------------+
 | ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
@@ -78,6 +96,30 @@ SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE,
 
 !ok
 
+SELECT * FROM TABLE(
+  HOP(
+    DATA => TABLE ORDERS,
+    TIMECOL => DESCRIPTOR(ROWTIME),
+    SLIDE => INTERVAL '5' MINUTE,
+    SIZE => INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
+
 SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE, INTERVAL '2' MINUTE));
 +---------------------+----+---------+-------+---------------------+---------------------+
 | ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
@@ -130,6 +172,25 @@ SELECT * FROM TABLE(SESSION(TABLE ORDERS, DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUC
 
 !ok
 
+SELECT * FROM TABLE(
+  SESSION(
+    DATA => TABLE ORDERS,
+    TIMECOL => DESCRIPTOR(ROWTIME),
+    KEY => DESCRIPTOR(PRODUCT),
+    SIZE => INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
 SELECT * FROM TABLE(SESSION((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
 +---------------------+----+---------+-------+---------------------+---------------------+
 | ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index 3bf384f..d5928aa 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -1876,10 +1876,24 @@ is named as "fixed windowing".
 
 | Operator syntax      | Description
 |:-------------------- |:-----------
-| TUMBLE(table, DESCRIPTOR(datetime), interval [, offset ]) | Indicates a tumbling window of *interval* for *datetime*, optionally aligned at offset.
+| TUMBLE(data, DESCRIPTOR(timecol), size [, offset ]) | Indicates a tumbling window of *size* interval for *timecol*, optionally aligned at *offset*.
 
 Here is an example:
-`SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+  TUMBLE(
+    TABLE orders,
+    DESCRIPTOR(rowtime),
+    INTERVAL '1' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+  TUMBLE(
+    DATA => TABLE orders,
+    TIMECOL => DESCRIPTOR(rowtime),
+    SIZE => INTERVAL '1' MINUTE));
+```
 will apply tumbling with 1 minute window size on rows from table orders. rowtime is the
 watermarked column of table orders that tells data completeness.
 
@@ -1890,10 +1904,26 @@ on a timestamp column. Windows assigned could have overlapping so hopping someti
 
 | Operator syntax      | Description
 |:-------------------- |:-----------
-| HOP(table, DESCRIPTOR(datetime), slide, size [, offset ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at offset.
+| HOP(data, DESCRIPTOR(timecol), slide, size [, offset ]) | Indicates a hopping window for *timecol*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at *offset*.
 
 Here is an example:
-`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+  HOP(
+    TABLE orders,
+    DESCRIPTOR(rowtime),
+    INTERVAL '2' MINUTE,
+    INTERVAL '5' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+  HOP(
+    DATA => TABLE orders,
+    TIMECOL => DESCRIPTOR(rowtime),
+    SLIDE => INTERVAL '2' MINUTE,
+    SIZE => INTERVAL '5' MINUTE));
+```
 will apply hopping with 5-minute interval size on rows from table orders and shifting every 2 minutes. rowtime is the
 watermarked column of table orders that tells data completeness.
 
@@ -1904,13 +1934,34 @@ of rows are less than *interval*. Session window is applied per *key*.
 
 | Operator syntax      | Description
 |:-------------------- |:-----------
-| session(table, DESCRIPTOR(datetime), DESCRIPTOR(key), interval) | Indicates a session window of *interval* for *datetime*. Session window is applied per *key*.
+| session(data, DESCRIPTOR(timecol), DESCRIPTOR(key), size) | Indicates a session window of *size* interval for *timecol*. Session window is applied per *key*.
 
 Here is an example:
-`SELECT * FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(rowtime), DESCRIPTOR(product), INTERVAL '20' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+  SESSION(
+    TABLE orders,
+    DESCRIPTOR(rowtime),
+    DESCRIPTOR(product),
+    INTERVAL '20' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+  SESSION(
+    DATA => TABLE orders,
+    TIMECOL => DESCRIPTOR(rowtime),
+    KEY => DESCRIPTOR(product),
+    SIZE => INTERVAL '20' MINUTE));
+```
 will apply session with 20-minute inactive gap on rows from table orders. rowtime is the
 watermarked column of table orders that tells data completeness. Session is applied per product.
 
+**Note**: The `Tumble`, `Hop` and `Session` window table functions assign
+each row in the original table to a window. The output table has all
+the same columns as the original table plus two additional columns `window_start`
+and `window_end`, which repesent the start and end of the window interval, respectively.
+
 ### Grouped window functions
 **warning**: grouped window functions are deprecated.