You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fe...@apache.org on 2020/05/09 12:23:56 UTC

[calcite] 02/02: [CALCITE-3780] SESSION Table function (Rui Wang)

This is an automated email from the ASF dual-hosted git repository.

fengzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit 890eb61efcccc486e2192110cefe4cac5aa6f150
Author: amaliujia <am...@163.com>
AuthorDate: Sat Feb 8 22:35:35 2020 -0800

    [CALCITE-3780] SESSION Table function (Rui Wang)
---
 core/src/main/codegen/templates/Parser.jj          |  24 ++-
 .../calcite/adapter/enumerable/EnumUtils.java      | 201 ++++++++++++++++++++-
 .../calcite/adapter/enumerable/RexImpTable.java    |  53 ++++--
 .../apache/calcite/sql/SqlHopTableFunction.java    |   4 +-
 ...eFunction.java => SqlSessionTableFunction.java} |  24 +--
 .../calcite/sql/fun/SqlStdOperatorTable.java       |  38 ++--
 .../apache/calcite/sql2rel/AuxiliaryConverter.java |   2 +-
 .../org/apache/calcite/util/BuiltInMethod.java     |   4 +-
 .../apache/calcite/test/SqlToRelConverterTest.java |  39 +++-
 .../org/apache/calcite/test/SqlValidatorTest.java  |  36 +++-
 .../apache/calcite/test/SqlToRelConverterTest.xml  |  72 +++++++-
 core/src/test/resources/sql/stream.iq              |  28 +++
 .../apache/calcite/linq4j/EnumerableDefaults.java  |  32 ----
 site/_docs/reference.md                            |  29 ++-
 14 files changed, 480 insertions(+), 106 deletions(-)

diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index b0c1395..18c3242 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -6050,19 +6050,31 @@ SqlCall GroupByWindowingCall():
 {
     final Span s;
     final List<SqlNode> args;
+    final SqlOperator op;
 }
 {
     (
-        <TUMBLE> { s = span(); }
-        args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
-            return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
+        <TUMBLE>
+        {
+            s = span();
+            op = SqlStdOperatorTable.TUMBLE_OLD;
+        }
+    |
+        <HOP>
+        {
+            s = span();
+            op = SqlStdOperatorTable.HOP_OLD;
         }
     |
-        <HOP> { s = span(); }
-        args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
-            return SqlStdOperatorTable.HOP.createCall(s.end(this), args);
+        <SESSION>
+        {
+            s = span();
+            op = SqlStdOperatorTable.SESSION_OLD;
         }
     )
+    args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+        return op.createCall(s.end(this), args);
+    }
 }
 
 SqlCall MatchRecognizeFunctionCall() :
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index b50a832..77779a4 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -45,6 +45,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.runtime.SortedMultiMap;
 import org.apache.calcite.runtime.SqlFunctions;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.Pair;
@@ -59,8 +60,10 @@ import java.math.BigDecimal;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Utilities for generating programs in the Enumerable (functional)
@@ -748,8 +751,12 @@ public class EnumUtils {
     return Expressions.lambda(Predicate2.class, builder.toBlock(), left_, right_);
   }
 
-  /** Generates a window selector which appends attribute of the window based on
-   * the parameters. */
+  /**
+   * Generates a window selector which appends attribute of the window based on
+   * the parameters.
+   *
+   * Note that it only works for batch scenario. E.g. all data is known and there is no late data.
+   */
   static Expression tumblingWindowSelector(
       PhysType inputPhysType,
       PhysType outputPhysType,
@@ -802,6 +809,147 @@ public class EnumUtils {
   }
 
   /**
+   * Creates enumerable implementation that applies sessionization to elements from the input
+   * enumerator based on a specified key. Elements are windowed into sessions separated by
+   * periods with no input for at least the duration specified by gap parameter.
+   */
+  public static Enumerable<Object[]> sessionize(Enumerator<Object[]> inputEnumerator,
+      int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+    return new AbstractEnumerable<Object[]>() {
+      @Override public Enumerator<Object[]> enumerator() {
+        return new SessionizationEnumerator(inputEnumerator,
+            indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+      }
+    };
+  }
+
+  private static class SessionizationEnumerator implements Enumerator<Object[]> {
+    private final Enumerator<Object[]> inputEnumerator;
+    private final int indexOfWatermarkedColumn;
+    private final int indexOfKeyColumn;
+    private final long gap;
+    private LinkedList<Object[]> list;
+    private boolean initialized;
+
+    /**
+     * Note that it only works for batch scenario. E.g. all data is known and there is no
+     * late data.
+     *
+     * @param inputEnumerator the enumerator to provide an array of objects as input
+     * @param indexOfWatermarkedColumn the index of timestamp column upon which a watermark is built
+     * @param indexOfKeyColumn the index of column that acts as grouping key
+     * @param gap gap parameter
+     */
+    SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+        int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+      this.inputEnumerator = inputEnumerator;
+      this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+      this.indexOfKeyColumn = indexOfKeyColumn;
+      this.gap = gap;
+      list = new LinkedList<>();
+      initialized = false;
+    }
+
+    @Override public Object[] current() {
+      if (!initialized) {
+        initialize();
+        initialized = true;
+      }
+      return list.pollFirst();
+    }
+
+    @Override public boolean moveNext() {
+      return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+    }
+
+    @Override public void reset() {
+      list.clear();
+      inputEnumerator.reset();
+      initialized = false;
+    }
+
+    @Override public void close() {
+      list.clear();
+      inputEnumerator.close();
+      initialized = false;
+    }
+
+    private void initialize() {
+      List<Object[]> elements = new ArrayList<>();
+      // initialize() will be called when inputEnumerator.moveNext() is true,
+      // thus firstly should take the current element.
+      elements.add(inputEnumerator.current());
+      // sessionization needs to see all data.
+      while (inputEnumerator.moveNext()) {
+        elements.add(inputEnumerator.current());
+      }
+
+      Map<Object, SortedMultiMap<Pair<Long, Long>, Object[]>> sessionKeyMap = new HashMap<>();
+      for (Object[] element : elements) {
+        sessionKeyMap.putIfAbsent(element[indexOfKeyColumn], new SortedMultiMap<>());
+        Pair initWindow = computeInitWindow(
+            SqlFunctions.toLong(element[indexOfWatermarkedColumn]), gap);
+        sessionKeyMap.get(element[indexOfKeyColumn]).putMulti(initWindow, element);
+      }
+
+      // merge per key session windows if there is any overlap between windows.
+      for (Map.Entry<Object, SortedMultiMap<Pair<Long, Long>, Object[]>> perKeyEntry
+          : sessionKeyMap.entrySet()) {
+        Map<Pair<Long, Long>, List<Object[]>> finalWindowElementsMap = new HashMap<>();
+        Pair<Long, Long> currentWindow = null;
+        List<Object[]> tempElementList = new ArrayList<>();
+        for (Map.Entry<Pair<Long, Long>, List<Object[]>> sessionEntry
+            : perKeyEntry.getValue().entrySet()) {
+          // check the next window can be merged.
+          if (currentWindow == null || !isOverlapped(currentWindow, sessionEntry.getKey())) {
+            // cannot merge window as there is no overlap
+            if (currentWindow != null) {
+              finalWindowElementsMap.put(currentWindow, new ArrayList<>(tempElementList));
+            }
+
+            currentWindow = sessionEntry.getKey();
+            tempElementList.clear();
+            tempElementList.addAll(sessionEntry.getValue());
+          } else {
+            // merge windows.
+            currentWindow = mergeWindows(currentWindow, sessionEntry.getKey());
+            // merge elements in windows.
+            tempElementList.addAll(sessionEntry.getValue());
+          }
+        }
+
+        if (!tempElementList.isEmpty()) {
+          finalWindowElementsMap.put(currentWindow, new ArrayList<>(tempElementList));
+        }
+
+        // construct final results from finalWindowElementsMap.
+        for (Map.Entry<Pair<Long, Long>, List<Object[]>> finalWindowElementsEntry
+            : finalWindowElementsMap.entrySet()) {
+          for (Object[] element : finalWindowElementsEntry.getValue()) {
+            Object[] curWithWindow = new Object[element.length + 2];
+            System.arraycopy(element, 0, curWithWindow, 0, element.length);
+            curWithWindow[element.length] = finalWindowElementsEntry.getKey().left;
+            curWithWindow[element.length + 1] = finalWindowElementsEntry.getKey().right;
+            list.offer(curWithWindow);
+          }
+        }
+      }
+    }
+
+    private boolean isOverlapped(Pair<Long, Long> a, Pair<Long, Long> b) {
+      return !(b.left >= a.right);
+    }
+
+    private Pair<Long, Long> mergeWindows(Pair<Long, Long> a, Pair<Long, Long> b) {
+      return new Pair<>(a.left <= b.left ? a.left : b.left, a.right >= b.right ? a.right : b.right);
+    }
+
+    private Pair<Long, Long> computeInitWindow(long ts, long gap) {
+      return new Pair<>(ts, ts + gap);
+    }
+  }
+
+  /**
    * Create enumerable implementation that applies hopping on each element from the input
    * enumerator and produces at least one element for each input element.
    */
@@ -822,11 +970,19 @@ public class EnumUtils {
     private final long intervalSize;
     private LinkedList<Object[]> list;
 
+    /**
+     * Note that it only works for batch scenario. E.g. all data is known and there is no late data.
+     *
+     * @param inputEnumerator the enumerator to provide an array of objects as input
+     * @param indexOfWatermarkedColumn the index of timestamp column upon which a watermark is built
+     * @param slide sliding size
+     * @param intervalSize window size
+     */
     HopEnumerator(Enumerator<Object[]> inputEnumerator,
-        int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+        int indexOfWatermarkedColumn, long slide, long intervalSize) {
       this.inputEnumerator = inputEnumerator;
       this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
-      this.emitFrequency = emitFrequency;
+      this.emitFrequency = slide;
       this.intervalSize = intervalSize;
       list = new LinkedList<>();
     }
@@ -850,10 +1006,7 @@ public class EnumUtils {
     }
 
     public boolean moveNext() {
-      if (list.size() > 0) {
-        return true;
-      }
-      return inputEnumerator.moveNext();
+      return list.size() > 0 || inputEnumerator.moveNext();
     }
 
     public void reset() {
@@ -879,4 +1032,36 @@ public class EnumUtils {
     }
     return ret;
   }
+
+  /**
+   * Apply tumbling per row from the enumerable input.
+   */
+  public static <TSource, TResult> Enumerable<TResult> tumbling(
+      Enumerable<TSource> inputEnumerable,
+      Function1<TSource, TResult> outSelector) {
+    return new AbstractEnumerable<TResult>() {
+      // Applies tumbling on each element from the input enumerator and produces
+      // exactly one element for each input element.
+      @Override public Enumerator<TResult> enumerator() {
+        return new Enumerator<TResult>() {
+          Enumerator<TSource> inputs = inputEnumerable.enumerator();
+
+          public TResult current() {
+            return outSelector.apply(inputs.current());
+          }
+
+          public boolean moveNext() {
+            return inputs.moveNext();
+          }
+
+          public void reset() {
+            inputs.reset();
+          }
+
+          public void close() {
+          }
+        };
+      }
+    };
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index 77a366f..230e9de 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -184,7 +184,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING_ID;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP_TVF;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INITCAP;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INTERSECTION;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_A_SET;
@@ -267,6 +267,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.REPLACE;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROUND;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW_NUMBER;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SESSION;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SESSION_USER;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SIGN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SIMILAR_TO;
@@ -283,7 +284,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SYSTEM_USER;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TAN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TRIM;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TRUNCATE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TUMBLE_TVF;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TUMBLE;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UNARY_MINUS;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UNARY_PLUS;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UPPER;
@@ -673,8 +674,9 @@ public class RexImpTable {
     matchMap.put(CLASSIFIER, ClassifierImplementor::new);
     matchMap.put(LAST, LastImplementor::new);
     map.put(PREV, new PrevImplementor());
-    tvfImplementorMap.put(TUMBLE_TVF, TumbleImplementor::new);
-    tvfImplementorMap.put(HOP_TVF, HopImplementor::new);
+    tvfImplementorMap.put(TUMBLE, TumbleImplementor::new);
+    tvfImplementorMap.put(HOP, HopImplementor::new);
+    tvfImplementorMap.put(SESSION, SessionImplementor::new);
   }
 
   private <T> Supplier<T> constructorSupplier(Class<T> klass) {
@@ -3239,23 +3241,50 @@ public class RexImpTable {
   private static class HopImplementor implements TableFunctionCallImplementor {
     @Override public Expression implement(RexToLixTranslator translator,
         Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
-      Expression intervalExpression = translator.translate(call.getOperands().get(2));
-      Expression intervalExpression2 = translator.translate(call.getOperands().get(3));
-      RexCall descriptor = (RexCall) call.getOperands().get(1);
+      Expression slidingInterval = translator.translate(call.getOperands().get(1));
+      Expression windowSize = translator.translate(call.getOperands().get(2));
+      RexCall descriptor = (RexCall) call.getOperands().get(0);
       List<Expression> translatedOperands = new ArrayList<>();
       Expression wmColIndexExpr =
           Expressions.constant(((RexInputRef) descriptor.getOperands().get(0)).getIndex());
       translatedOperands.add(wmColIndexExpr);
-      translatedOperands.add(intervalExpression);
-      translatedOperands.add(intervalExpression2);
+      translatedOperands.add(slidingInterval);
+      translatedOperands.add(windowSize);
 
       return Expressions.call(
           BuiltInMethod.HOPPING.method,
           Expressions.list(
               Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
-              wmColIndexExpr,
-              intervalExpression,
-              intervalExpression2));
+              translatedOperands.get(0),
+              translatedOperands.get(1),
+              translatedOperands.get(2)));
+    }
+  }
+
+  /** Implements per-key sessionization. */
+  private static class SessionImplementor implements TableFunctionCallImplementor {
+    @Override public Expression implement(RexToLixTranslator translator,
+        Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
+      RexCall timestampDescriptor = (RexCall) call.getOperands().get(0);
+      RexCall keyDescriptor = (RexCall) call.getOperands().get(1);
+      Expression gapInterval = translator.translate(call.getOperands().get(2));
+
+      List<Expression> translatedOperands = new ArrayList<>();
+      Expression wmColIndexExpr =
+          Expressions.constant(((RexInputRef) timestampDescriptor.getOperands().get(0)).getIndex());
+      Expression keyColIndexExpr =
+          Expressions.constant(((RexInputRef) keyDescriptor.getOperands().get(0)).getIndex());
+      translatedOperands.add(wmColIndexExpr);
+      translatedOperands.add(keyColIndexExpr);
+      translatedOperands.add(gapInterval);
+
+      return Expressions.call(
+          BuiltInMethod.SESSIONIZATION.method,
+          Expressions.list(
+              Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
+              translatedOperands.get(0),
+              translatedOperands.get(1),
+              translatedOperands.get(2)));
     }
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index d32b909..43b2b91 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -40,8 +40,6 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
 
   @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
       boolean throwOnFailure) {
-    // There should only be three operands, and number of operands are checked before
-    // this call.
     final SqlNode operand0 = callBinding.operand(0);
     final SqlValidator validator = callBinding.getValidator();
     final RelDataType type = validator.getValidatedNodeType(operand0);
@@ -65,7 +63,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
   }
 
   @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
+    return getName() + "(TABLE table_name, DESCRIPTOR(col), "
         + "datetime interval, datetime interval)";
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
similarity index 76%
copy from core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
copy to core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index d32b909..fef50cc 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -23,15 +23,16 @@ import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlValidator;
 
 /**
- * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
+ * SqlSessionTableFunction implements an operator for per-key sessionization. It allows
+ * four parameters:
  * 1. a table.
  * 2. a descriptor to provide a watermarked column name from the input table.
- * 3. an interval parameter to specify the length of window shifting.
- * 4. an interval parameter to specify the length of window size.
+ * 3. a descriptor to provide a column as key, on which sessionization will be applied.
+ * 4. an interval parameter to specify a inactive activity gap to break sessions.
  */
-public class SqlHopTableFunction extends SqlWindowTableFunction {
-  public SqlHopTableFunction() {
-    super(SqlKind.HOP.name());
+public class SqlSessionTableFunction extends SqlWindowTableFunction {
+  public SqlSessionTableFunction() {
+    super(SqlKind.SESSION.name());
   }
 
   @Override public SqlOperandCountRange getOperandCountRange() {
@@ -40,8 +41,6 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
 
   @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
       boolean throwOnFailure) {
-    // There should only be three operands, and number of operands are checked before
-    // this call.
     final SqlNode operand0 = callBinding.operand(0);
     final SqlValidator validator = callBinding.getValidator();
     final RelDataType type = validator.getValidatedNodeType(operand0);
@@ -53,10 +52,11 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
       return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
     }
     validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
-    final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
-    if (!SqlTypeUtil.isInterval(type2)) {
+    final SqlNode operand2 = callBinding.operand(2);
+    if (operand2.getKind() != SqlKind.DESCRIPTOR) {
       return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
     }
+    validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList());
     final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
     if (!SqlTypeUtil.isInterval(type3)) {
       return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
@@ -65,7 +65,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
   }
 
   @Override public String getAllowedSignatures(String opNameToUse) {
-    return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
-        + "datetime interval, datetime interval)";
+    return getName() + "(TABLE table_name, DESCRIPTOR(col), "
+        + "DESCRIPTOR(col), datetime interval)";
   }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 526df03..e026b95 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -45,6 +45,7 @@ import org.apache.calcite.sql.SqlPrefixOperator;
 import org.apache.calcite.sql.SqlProcedureCallOperator;
 import org.apache.calcite.sql.SqlRankFunction;
 import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSessionTableFunction;
 import org.apache.calcite.sql.SqlSetOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlSyntax;
@@ -2294,11 +2295,14 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /** DESCRIPTOR(column_name, ...). */
   public static final SqlOperator DESCRIPTOR = new SqlDescriptorOperator();
 
-  /** TUMBLE as a table-value function. */
-  public static final SqlFunction TUMBLE_TVF = new SqlTumbleTableFunction();
+  /** TUMBLE as a table function. */
+  public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
 
-  /** HOP as a table-value function. */
-  public static final SqlFunction HOP_TVF = new SqlHopTableFunction();
+  /** HOP as a table function. */
+  public static final SqlFunction HOP = new SqlHopTableFunction();
+
+  /** SESSION as a table function. */
+  public static final SqlFunction SESSION = new SqlSessionTableFunction();
 
   /** The {@code TUMBLE} group function.
    *
@@ -2313,7 +2317,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
    * this TUMBLE group function, and in fact all group functions. See
    * [CALCITE-3340] for details.
    */
-  public static final SqlGroupedWindowFunction TUMBLE =
+  public static final SqlGroupedWindowFunction TUMBLE_OLD =
       new SqlGroupedWindowFunction("$TUMBLE", SqlKind.TUMBLE,
           null, ReturnTypes.ARG0, null,
           OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
@@ -2327,15 +2331,15 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /** The {@code TUMBLE_START} auxiliary function of
    * the {@code TUMBLE} group function. */
   public static final SqlGroupedWindowFunction TUMBLE_START =
-      TUMBLE.auxiliary(SqlKind.TUMBLE_START);
+      TUMBLE_OLD.auxiliary(SqlKind.TUMBLE_START);
 
   /** The {@code TUMBLE_END} auxiliary function of
    * the {@code TUMBLE} group function. */
   public static final SqlGroupedWindowFunction TUMBLE_END =
-      TUMBLE.auxiliary(SqlKind.TUMBLE_END);
+      TUMBLE_OLD.auxiliary(SqlKind.TUMBLE_END);
 
   /** The {@code HOP} group function. */
-  public static final SqlGroupedWindowFunction HOP =
+  public static final SqlGroupedWindowFunction HOP_OLD =
       new SqlGroupedWindowFunction("$HOP", SqlKind.HOP, null,
           ReturnTypes.ARG0, null,
           OperandTypes.or(OperandTypes.DATETIME_INTERVAL_INTERVAL,
@@ -2349,16 +2353,16 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /** The {@code HOP_START} auxiliary function of
    * the {@code HOP} group function. */
   public static final SqlGroupedWindowFunction HOP_START =
-      HOP.auxiliary(SqlKind.HOP_START);
+      HOP_OLD.auxiliary(SqlKind.HOP_START);
 
   /** The {@code HOP_END} auxiliary function of
    * the {@code HOP} group function. */
   public static final SqlGroupedWindowFunction HOP_END =
-      HOP.auxiliary(SqlKind.HOP_END);
+      HOP_OLD.auxiliary(SqlKind.HOP_END);
 
   /** The {@code SESSION} group function. */
-  public static final SqlGroupedWindowFunction SESSION =
-      new SqlGroupedWindowFunction(SqlKind.SESSION.name(), SqlKind.SESSION,
+  public static final SqlGroupedWindowFunction SESSION_OLD =
+      new SqlGroupedWindowFunction("$SESSION", SqlKind.SESSION,
           null, ReturnTypes.ARG0, null,
           OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
               OperandTypes.DATETIME_INTERVAL_TIME),
@@ -2371,12 +2375,12 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /** The {@code SESSION_START} auxiliary function of
    * the {@code SESSION} group function. */
   public static final SqlGroupedWindowFunction SESSION_START =
-      SESSION.auxiliary(SqlKind.SESSION_START);
+      SESSION_OLD.auxiliary(SqlKind.SESSION_START);
 
   /** The {@code SESSION_END} auxiliary function of
    * the {@code SESSION} group function. */
   public static final SqlGroupedWindowFunction SESSION_END =
-      SESSION.auxiliary(SqlKind.SESSION_END);
+      SESSION_OLD.auxiliary(SqlKind.SESSION_END);
 
   /** {@code |} operator to create alternate patterns
    * within {@code MATCH_RECOGNIZE}.
@@ -2495,13 +2499,13 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
     switch (kind) {
     case TUMBLE_START:
     case TUMBLE_END:
-      return TUMBLE;
+      return TUMBLE_OLD;
     case HOP_START:
     case HOP_END:
-      return HOP;
+      return HOP_OLD;
     case SESSION_START:
     case SESSION_END:
-      return SESSION;
+      return SESSION_OLD;
     default:
       return null;
     }
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
index a69405c..c0bc1a1 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
@@ -25,7 +25,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 /** Converts an expression for a group window function (e.g. TUMBLE)
  * into an expression for an auxiliary group function (e.g. TUMBLE_START).
  *
- * @see SqlStdOperatorTable#TUMBLE
+ * @see SqlStdOperatorTable#TUMBLE_OLD
  */
 public interface AuxiliaryConverter {
   /** Converts an expression.
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index fc60dcc..f0b8f81 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -592,8 +592,10 @@ public enum BuiltInMethod {
       "resultSelector", Function2.class),
   AGG_LAMBDA_FACTORY_ACC_SINGLE_GROUP_RESULT_SELECTOR(AggregateLambdaFactory.class,
       "singleGroupResultSelector", Function1.class),
-  TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class),
+  TUMBLING(EnumUtils.class, "tumbling", Enumerable.class, Function1.class),
   HOPPING(EnumUtils.class, "hopping", Enumerator.class, int.class, long.class,
+      long.class),
+  SESSIONIZATION(EnumUtils.class, "sessionize", Enumerator.class, int.class, int.class,
       long.class);
 
   public final Method method;
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 7f8dc7e..3144d1e 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1805,18 +1805,53 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
     sql(sql).ok();
   }
 
-  @Test void testTableValuedFunctionTumble() {
+  @Test void testTableFunctionTumble() {
     final String sql = "select *\n"
         + "from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE))";
     sql(sql).ok();
   }
 
-  @Test void testTableValuedFunctionTumbleWithSubQueryParam() {
+  @Test public void testTableFunctionHop() {
+    final String sql = "select *\n"
+        + "from table(hop(table Shipments, descriptor(rowtime), "
+        + "INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test public void testTableFunctionSession() {
+    final String sql = "select *\n"
+        + "from table(session(table Shipments, descriptor(rowtime), "
+        + "descriptor(orderId), INTERVAL '10' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test void testTableFunctionTumbleWithSubQueryParam() {
     final String sql = "select *\n"
         + "from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))";
     sql(sql).ok();
   }
 
+  @Test public void testTableFunctionHopWithSubQueryParam() {
+    final String sql = "select *\n"
+        + "from table(hop((select * from Shipments), descriptor(rowtime), "
+        + "INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test public void testTableFunctionSessionWithSubQueryParam() {
+    final String sql = "select *\n"
+        + "from table(session((select * from Shipments), descriptor(rowtime), "
+        + "descriptor(orderId), INTERVAL '10' MINUTE))";
+    sql(sql).ok();
+  }
+
+  @Test public void testTableFunctionSessionCompoundSessionKey() {
+    final String sql = "select *\n"
+        + "from table(session(table Orders, descriptor(rowtime), "
+        + "descriptor(orderId, productId), INTERVAL '10' MINUTE))";
+    sql(sql).ok();
+  }
+
   @Test void testNotNotIn() {
     final String sql = "select * from EMP where not (ename not in ('Fred') )";
     sql(sql).ok();
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 241898e..f199035 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10248,24 +10248,54 @@ class SqlValidatorTest extends SqlValidatorTestCase {
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
             + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+            + "col\\), datetime interval, datetime interval\\)");
     sql("select * from table(\n"
         + "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+            + "col\\), datetime interval, datetime interval\\)");
     sql("select * from table(\n"
         + "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
         .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
             + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
-            + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+            + "col\\), datetime interval, datetime interval\\)");
     sql("select * from table(\n"
         + "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
         .fails("Object 'TABLER_NOT_EXIST' not found");
   }
 
+  @Test public void testSessionTableFunction() {
+    sql("select * from table(\n"
+        + "session(table orders, descriptor(rowtime), descriptor(productid), interval '1' hour))")
+        .ok();
+    sql("select * from table(\n"
+        + "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
+        .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
+    sql("select * from table(\n"
+        + "^session(table orders, descriptor(rowtime), descriptor(productid), 'test')^)")
+        .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+            + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <COLUMN_LIST>, "
+            + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+    sql("select * from table(\n"
+        + "^session(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
+        .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+            + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+            + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+    sql("select * from table(\n"
+        + "^session(table orders, 'test', descriptor(productid), interval '2' hour)^)")
+        .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+            + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <COLUMN_LIST>, "
+            + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+            + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+    sql("select * from table(\n"
+        + "session(TABLE ^tabler_not_exist^, descriptor(rowtime), descriptor(productid), interval '1' hour))")
+        .fails("Object 'TABLER_NOT_EXIST' not found");
+  }
+
   @Test public void testStreamTumble() {
     // TUMBLE
     sql("select stream tumble_end(rowtime, interval '2' hour) as rowtime\n"
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index c19e32a..fb92f25 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -3576,7 +3576,7 @@ group by session(rowtime, interval '1' hour)]]>
 LogicalDelta
   LogicalProject(ROWTIME=[$0], EXPR$1=[$0], C=[$1])
     LogicalAggregate(group=[{0}], C=[COUNT()])
-      LogicalProject($f0=[SESSION($0, 3600000:INTERVAL HOUR)])
+      LogicalProject($f0=[$SESSION($0, 3600000:INTERVAL HOUR)])
         LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
 ]]>
         </Resource>
@@ -4931,6 +4931,34 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testTableFunctionHop">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(hop(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionSession">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(session(table Shipments, descriptor(rowtime), descriptor(orderId), INTERVAL '10' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testTableFunctionTumbleWithSubQueryParam">
         <Resource name="sql">
             <![CDATA[select *
@@ -4945,6 +4973,48 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testTableFunctionHopWithSubQueryParam">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(hop((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionSessionWithSubQueryParam">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(session((select * from Shipments), descriptor(rowtime), descriptor(orderId), INTERVAL '10' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+  LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+      LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testTableFunctionSessionCompoundSessionKey">
+        <Resource name="sql">
+            <![CDATA[select *
+from table(session(table Orders, descriptor(rowtime), descriptor(orderId, productId), INTERVAL '10' MINUTE))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], window_start=[$3], window_end=[$4])
+  LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($0), DESCRIPTOR($2, $1), 600000:INTERVAL MINUTE)], rowType=[RecordType(TIMESTAMP(0) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+    LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2])
+      LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testTumbleTableRowtimeNotFirstColumn">
         <Resource name="sql">
             <![CDATA[select stream
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 15f5832..ba208a7 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -82,3 +82,31 @@ SELECT * FROM TABLE(HOP((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL '5
 (10 rows)
 
 !ok
+
+SELECT * FROM TABLE(SESSION(TABLE ORDERS, DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
+SELECT * FROM TABLE(SESSION((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME             | ID | PRODUCT | UNITS | window_start        | window_end          |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 |  1 | paint   |    10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 |  2 | paper   |     5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 |  3 | brush   |    12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 |  4 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 |  5 | paint   |     3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index ec029ae..bd9b2ab 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -4256,36 +4256,4 @@ public abstract class EnumerableDefaults {
       }
     };
   }
-
-  /**
-   * Apply tumbling per row from the enumerable input.
-   */
-  public static <TSource, TResult> Enumerable<TResult> tumbling(
-      Enumerable<TSource> inputEnumerable,
-      Function1<TSource, TResult> outSelector) {
-    return new AbstractEnumerable<TResult>() {
-      // Applies tumbling on each element from the input enumerator and produces
-      // exactly one element for each input element.
-      @Override public Enumerator<TResult> enumerator() {
-        return new Enumerator<TResult>() {
-          Enumerator<TSource> inputs = inputEnumerable.enumerator();
-
-          public TResult current() {
-            return outSelector.apply(inputs.current());
-          }
-
-          public boolean moveNext() {
-            return inputs.moveNext();
-          }
-
-          public void reset() {
-            inputs.reset();
-          }
-
-          public void close() {
-          }
-        };
-      }
-    };
-  }
 }
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index f8aa8ec..7fd2930 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -1866,8 +1866,8 @@ Not implemented:
 |:-------------------- |:-----------
 | DESCRIPTOR(name [, name ]*) | DESCRIPTOR appears as an argument in a function to indicate a list of names. The interpretation of names is left to the function.
 
-### Table-valued functions.
-Table-valued functions occur in the `FROM` clause.
+### Table functions.
+Table functions occur in the `FROM` clause.
 
 #### TUMBLE
 In streaming queries, TUMBLE assigns a window for each row of a relation based on a timestamp column. An assigned window
@@ -1876,7 +1876,7 @@ is named as "fixed windowing".
 
 | Operator syntax      | Description
 |:-------------------- |:-----------
-| TUMBLE(table, DESCRIPTOR(column_name), interval [, time ]) | Indicates a tumbling window of *interval* for *datetime*, optionally aligned at *time*. Tumbling is applied on table in which there is a watermarked column specified by descriptor.
+| TUMBLE(table, DESCRIPTOR(datetime), interval) | Indicates a tumbling window of *interval* for *datetime*.
 
 Here is an example:
 `SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))`,
@@ -1884,20 +1884,33 @@ will apply tumbling with 1 minute window size on rows from table orders. rowtime
 watermarked column of table orders that tells data completeness.
 
 #### HOP
-In streaming queries, HOP assigns windows that cover rows within the interval of *size*, shifting every *slide*, 
-and optionally aligned at *time* based on a timestamp column. Windows assigned could have overlapping so hopping 
-sometime is named as "sliding windowing".  
+In streaming queries, HOP assigns windows that cover rows within the interval of *size* and shifting every *slide* based
+on a timestamp column. Windows assigned could have overlapping so hopping sometime is named as "sliding windowing".
 
 
 | Operator syntax      | Description
 |:-------------------- |:-----------
-| HOP(table, DESCRIPTOR(column_name), slide, size, [, time ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*, and optionally aligned at *time*. Hopping is applied on table in which there is a watermarked column specified by descriptor.
+| HOP(table, DESCRIPTOR(datetime), slide, size) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*.
 
 Here is an example:
 `SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
-will apply hopping with 5-minute interval size on rows from table orders, shifting every 2 minutes. rowtime is the
+will apply hopping with 5-minute interval size on rows from table orders and shifting every 2 minutes. rowtime is the
 watermarked column of table orders that tells data completeness.
 
+#### SESSION
+In streaming queries, SESSION assigns windows that cover rows based on *datetime*. Within a session window, distances
+of rows are less than *interval*. Session window is applied per *key*.
+
+
+| Operator syntax      | Description
+|:-------------------- |:-----------
+| session(table, DESCRIPTOR(datetime), DESCRIPTOR(key), interval) | Indicates a session window of *interval* for *datetime*. Session window is applied per *key*.
+
+Here is an example:
+`SELECT * FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(rowtime), DESCRIPTOR(product), INTERVAL '20' MINUTE))`,
+will apply session with 20-minute inactive gap on rows from table orders. rowtime is the
+watermarked column of table orders that tells data completeness. Session is applied per product.
+
 ### Grouped window functions
 **warning**: grouped window functions are deprecated.