You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:06 UTC

[5/5] flink git commit: [FLINK-6483] [table] Add materialization of time indicators.

[FLINK-6483] [table] Add materialization of time indicators.

This closes #3862.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b50ef4b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b50ef4b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b50ef4b8

Branch: refs/heads/master
Commit: b50ef4b8de73e0e19df154d87ea588236e3ccb43
Parents: 2480887
Author: twalthr <tw...@apache.org>
Authored: Wed May 10 10:11:34 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 08:09:54 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      |  39 +-
 .../calcite/RelTimeIndicatorConverter.scala     | 404 +++++++++++++------
 .../flink/table/codegen/CodeGenerator.scala     |  47 ++-
 .../flink/table/plan/nodes/CommonCalc.scala     |  13 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  19 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |   6 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |   8 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  14 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |  23 +-
 .../datastream/StreamTableSourceScan.scala      |   4 +-
 .../plan/nodes/logical/FlinkLogicalCalc.scala   |   2 +-
 .../logical/FlinkLogicalTableSourceScan.scala   |   6 +-
 .../DataStreamLogicalWindowAggregateRule.scala  |  14 +-
 .../flink/table/plan/schema/RowSchema.scala     |  11 +-
 .../plan/schema/StreamTableSourceTable.scala    |   8 +-
 .../runtime/CRowCorrelateFlatMapRunner.scala    |  83 ----
 .../runtime/CRowCorrelateProcessRunner.scala    |  91 +++++
 .../flink/table/runtime/CRowFlatMapRunner.scala |  72 ----
 .../flink/table/runtime/CRowProcessRunner.scala |  80 ++++
 .../table/sources/DefinedTimeAttributes.scala   |  60 ---
 .../table/sources/definedTimeAttributes.scala   |  60 +++
 .../stream/StreamTableEnvironmentTest.scala     |  10 +-
 .../api/scala/stream/TableSourceTest.scala      |  22 +-
 .../calcite/RelTimeIndicatorConverterTest.scala | 351 ++++++++++++++++
 .../datastream/TimeAttributesITCase.scala       | 237 +++++++++++
 .../flink/table/utils/TableTestBase.scala       |   5 +
 26 files changed, 1267 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d68da04..994ac80 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -18,38 +18,37 @@
 
 package org.apache.flink.table.api
 
-import _root_.java.util.concurrent.atomic.AtomicInteger
 import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.AtomicType
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
-import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
-import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
 import org.apache.flink.types.Row
 
@@ -111,6 +110,17 @@ abstract class StreamTableEnvironment(
   override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
     checkValidTableName(name)
 
+    // check if event-time is enabled
+    tableSource match {
+      case dra: DefinedRowtimeAttribute if
+          execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime =>
+
+        throw TableException(
+          s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
+            s"But is: ${execEnv.getStreamTimeCharacteristic}")
+      case _ => // ok
+    }
+
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
         registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
@@ -390,6 +400,13 @@ abstract class StreamTableEnvironment(
     // validate and extract time attributes
     val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
 
+    // check if event-time is enabled
+    if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
+      throw TableException(
+        s"A rowtime attribute requires an EventTime time characteristic in stream environment. " +
+          s"But is: ${execEnv.getStreamTimeCharacteristic}")
+    }
+
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
       fieldIndexes,
@@ -518,9 +535,9 @@ abstract class StreamTableEnvironment(
     // 3. normalize the logical plan
     val normRuleSet = getNormRuleSet
     val normalizedPlan = if (normRuleSet.iterator().hasNext) {
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
+      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
     } else {
-      decorPlan
+      convPlan
     }
 
     // 4. optimize the logical Flink plan

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index fa2e3ee..7ceb397 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -18,73 +18,43 @@
 
 package org.apache.flink.table.calcite
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
-import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
+import org.apache.calcite.rel.{RelNode, RelShuttle}
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.functions.TimeMaterializationSqlFunction
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 /**
   * Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a
   * time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in
   * some cases, but not all.
   */
-class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
+class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
 
-  override def visit(project: LogicalProject): RelNode = {
-    // visit children and update inputs
-    val updatedProject = super.visit(project).asInstanceOf[LogicalProject]
+  private val timestamp = rexBuilder
+      .getTypeFactory
+      .asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
 
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      updatedProject.getInput.getRowType.getFieldList.map(_.getType))
-    val newProjects = updatedProject.getProjects.map(_.accept(materializer))
-
-    // copy project
-    updatedProject.copy(
-      updatedProject.getTraitSet,
-      updatedProject.getInput,
-      newProjects,
-      buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType))
-    )
-  }
-
-  override def visit(filter: LogicalFilter): RelNode = {
-    // visit children and update inputs
-    val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter]
-
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      updatedFilter.getInput.getRowType.getFieldList.map(_.getType))
-    val newCondition = updatedFilter.getCondition.accept(materializer)
-
-    // copy filter
-    updatedFilter.copy(
-      updatedFilter.getTraitSet,
-      updatedFilter.getInput,
-      newCondition
-    )
-  }
+  override def visit(intersect: LogicalIntersect): RelNode =
+    throw new TableException("Logical intersect in a stream environment is not supported yet.")
 
   override def visit(union: LogicalUnion): RelNode = {
     // visit children and update inputs
-    val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion]
+    val inputs = union.getInputs.map(_.accept(this))
 
     // make sure that time indicator types match
-    val inputTypes = updatedUnion.getInputs.map(_.getRowType)
+    val inputTypes = inputs.map(_.getRowType)
 
     val head = inputTypes.head.getFieldList.map(_.getType)
 
@@ -114,101 +84,269 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl {
         "Union fields with time attributes have different types.")
     }
 
-    updatedUnion
+    LogicalUnion.create(inputs, union.all)
   }
 
+  override def visit(aggregate: LogicalAggregate): RelNode = convertAggregate(aggregate)
+
+  override def visit(minus: LogicalMinus): RelNode =
+    throw new TableException("Logical minus in a stream environment is not supported yet.")
+
+  override def visit(sort: LogicalSort): RelNode =
+    throw new TableException("Logical sort in a stream environment is not supported yet.")
+
+  override def visit(`match`: LogicalMatch): RelNode =
+    throw new TableException("Logical match in a stream environment is not supported yet.")
+
   override def visit(other: RelNode): RelNode = other match {
-    case scan: LogicalTableFunctionScan if
-        stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
+
+    case uncollect: Uncollect =>
       // visit children and update inputs
-      val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
-
-      val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
-
-      // check if input field contains time indicator type
-      // materialize field if no time indicator is present anymore
-      // if input field is already materialized, change to timestamp type
-      val materializer = new RexTimeIndicatorMaterializer(
-        rexBuilder,
-        correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
-      val newCall = updatedScan.getCall.accept(materializer)
-
-      // copy scan
-      updatedScan.copy(
-        updatedScan.getTraitSet,
-        updatedScan.getInputs,
-        newCall,
-        updatedScan.getElementType,
-        updatedScan.getRowType,
-        updatedScan.getColumnMappings
-      )
+      val input = uncollect.getInput.accept(this)
+      Uncollect.create(uncollect.getTraitSet, input, uncollect.withOrdinality)
+
+    case scan: LogicalTableFunctionScan =>
+      scan
+
+    case aggregate: LogicalWindowAggregate =>
+      val convAggregate = convertAggregate(aggregate)
+
+      LogicalWindowAggregate.create(
+        aggregate.getWindow,
+        aggregate.getNamedProperties,
+        convAggregate)
 
     case _ =>
-      super.visit(other)
+      throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
   }
 
-  private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = {
-    val fields = names.zipWithIndex.map { case (name, idx) =>
-      new RelDataTypeFieldImpl(name, idx, types(idx))
-    }
-    new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
+
+  override def visit(exchange: LogicalExchange): RelNode =
+    throw new TableException("Logical exchange in a stream environment is not supported yet.")
+
+  override def visit(scan: TableScan): RelNode = scan
+
+  override def visit(scan: TableFunctionScan): RelNode =
+    throw new TableException("Table function scan in a stream environment is not supported yet.")
+
+  override def visit(values: LogicalValues): RelNode = values
+
+  override def visit(filter: LogicalFilter): RelNode = {
+    // visit children and update inputs
+    val input = filter.getInput.accept(this)
+
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    val materializer = new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      input.getRowType.getFieldList.map(_.getType))
+
+    val condition = filter.getCondition.accept(materializer)
+    LogicalFilter.create(input, condition)
   }
-}
 
-class RexTimeIndicatorMaterializer(
-    private val rexBuilder: RexBuilder,
-    private val input: Seq[RelDataType])
-  extends RexShuttle {
-
-  val timestamp = rexBuilder
-    .getTypeFactory
-    .asInstanceOf[FlinkTypeFactory]
-    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-    // reference is interesting
-    if (isTimeIndicatorType(inputRef.getType)) {
-      val resolvedRefType = input(inputRef.getIndex)
-      // input is a valid time indicator
-      if (isTimeIndicatorType(resolvedRefType)) {
-        inputRef
-      }
-      // input has been materialized
-      else {
-        new RexInputRef(inputRef.getIndex, resolvedRefType)
-      }
-    }
-    // reference is a regular field
-    else {
-      super.visitInputRef(inputRef)
+  override def visit(project: LogicalProject): RelNode = {
+    // visit children and update inputs
+    val input = project.getInput.accept(this)
+
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    val materializer = new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      input.getRowType.getFieldList.map(_.getType))
+
+    val projects = project.getProjects.map(_.accept(materializer))
+    val fieldNames = project.getRowType.getFieldNames
+    LogicalProject.create(input, projects, fieldNames)
+  }
+
+  override def visit(join: LogicalJoin): RelNode =
+    throw new TableException("Logical join in a stream environment is not supported yet.")
+
+  override def visit(correlate: LogicalCorrelate): RelNode = {
+    // visit children and update inputs
+    val inputs = correlate.getInputs.map(_.accept(this))
+
+    val right = inputs(1) match {
+      case scan: LogicalTableFunctionScan =>
+        // visit children and update inputs
+        val scanInputs = scan.getInputs.map(_.accept(this))
+
+        // check if input field contains time indicator type
+        // materialize field if no time indicator is present anymore
+        // if input field is already materialized, change to timestamp type
+        val materializer = new RexTimeIndicatorMaterializer(
+          rexBuilder,
+          inputs.head.getRowType.getFieldList.map(_.getType))
+
+        val call = scan.getCall.accept(materializer)
+        LogicalTableFunctionScan.create(
+          scan.getCluster,
+          scanInputs,
+          call,
+          scan.getElementType,
+          scan.getRowType,
+          scan.getColumnMappings)
+
+      case _ =>
+        inputs(1)
     }
+
+    LogicalCorrelate.create(
+      inputs.head,
+      right,
+      correlate.getCorrelationId,
+      correlate.getRequiredColumns,
+      correlate.getJoinType)
   }
 
-  override def visitCall(call: RexCall): RexNode = {
-    val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+  private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
+    // visit children and update inputs
+    val input = aggregate.getInput.accept(this)
+
+    // add a project to materialize aggregation arguments/grouping keys
+
+    val refIndices = mutable.Set[Int]()
+
+    // check arguments of agg calls
+    aggregate.getAggCallList.foreach(call => if (call.getArgList.size() == 0) {
+        // count(*) has an empty argument list
+        (0 until input.getRowType.getFieldCount).foreach(refIndices.add)
+      } else {
+        // for other aggregations
+        call.getArgList.map(_.asInstanceOf[Int]).foreach(refIndices.add)
+      })
 
-    // skip materialization for special operators
-    updatedCall.getOperator match {
-      case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
-        return updatedCall
+    // check grouping sets
+    aggregate.getGroupSets.foreach(set =>
+      set.asList().map(_.asInstanceOf[Int]).foreach(refIndices.add)
+    )
 
-      case _ => // do nothing
+    val needsMaterialization = refIndices.exists(idx =>
+      isTimeIndicatorType(input.getRowType.getFieldList.get(idx).getType))
+
+    // create project if necessary
+    val projectedInput = if (needsMaterialization) {
+
+      // insert or merge with input project if
+      // a time attribute is accessed and needs to be materialized
+      input match {
+
+        // merge
+        case lp: LogicalProject =>
+          val projects = lp.getProjects.zipWithIndex.map { case (expr, idx) =>
+            if (isTimeIndicatorType(expr.getType) && refIndices.contains(idx)) {
+              rexBuilder.makeCall(
+                TimeMaterializationSqlFunction,
+                expr)
+            } else {
+              expr
+            }
+          }
+
+          LogicalProject.create(
+            lp.getInput,
+            projects,
+            input.getRowType.getFieldNames)
+
+        // new project
+        case _ =>
+          val projects = input.getRowType.getFieldList.map { field =>
+            if (isTimeIndicatorType(field.getType) && refIndices.contains(field.getIndex)) {
+              rexBuilder.makeCall(
+                TimeMaterializationSqlFunction,
+                new RexInputRef(field.getIndex, field.getType))
+            } else {
+              new RexInputRef(field.getIndex, field.getType)
+            }
+          }
+
+          LogicalProject.create(
+            input,
+            projects,
+            input.getRowType.getFieldNames)
+      }
+    } else {
+      // no project necessary
+      input
     }
 
-    // materialize operands with time indicators
-    val materializedOperands = updatedCall.getOperands.map { o =>
-      if (isTimeIndicatorType(o.getType)) {
-        rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+    // remove time indicator type as agg call return type
+    val updatedAggCalls = aggregate.getAggCallList.map { call =>
+      val callType = if (isTimeIndicatorType(call.getType)) {
+        timestamp
       } else {
-        o
+        call.getType
       }
+      AggregateCall.create(
+        call.getAggregation,
+        call.isDistinct,
+        call.getArgList,
+        call.filterArg,
+        callType,
+        call.name)
     }
 
-    // remove time indicator return type
-    if (isTimeIndicatorType(updatedCall.getType)) {
-      updatedCall.clone(timestamp, materializedOperands)
-    } else {
-      updatedCall.clone(updatedCall.getType, materializedOperands)
+    LogicalAggregate.create(
+      projectedInput,
+      aggregate.indicator,
+      aggregate.getGroupSet,
+      aggregate.getGroupSets,
+      updatedAggCalls)
+  }
+
+  class RexTimeIndicatorMaterializer(
+      private val rexBuilder: RexBuilder,
+      private val input: Seq[RelDataType])
+    extends RexShuttle {
+
+    override def visitInputRef(inputRef: RexInputRef): RexNode = {
+      // reference is interesting
+      if (isTimeIndicatorType(inputRef.getType)) {
+        val resolvedRefType = input(inputRef.getIndex)
+        // input is a valid time indicator
+        if (isTimeIndicatorType(resolvedRefType)) {
+          inputRef
+        }
+        // input has been materialized
+        else {
+          new RexInputRef(inputRef.getIndex, resolvedRefType)
+        }
+      }
+      // reference is a regular field
+      else {
+        super.visitInputRef(inputRef)
+      }
+    }
+
+    override def visitCall(call: RexCall): RexNode = {
+      val updatedCall = super.visitCall(call).asInstanceOf[RexCall]
+
+      // materialize operands with time indicators
+      val materializedOperands = updatedCall.getOperator match {
+
+        // skip materialization for special operators
+        case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE =>
+          updatedCall.getOperands.toList
+
+        case _ =>
+          updatedCall.getOperands.map { o =>
+            if (isTimeIndicatorType(o.getType)) {
+              rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+            } else {
+              o
+            }
+          }
+      }
+
+      // remove time indicator return type
+      if (isTimeIndicatorType(updatedCall.getType)) {
+        updatedCall.clone(timestamp, materializedOperands)
+      } else {
+        updatedCall.clone(updatedCall.getType, materializedOperands)
+      }
     }
   }
 }
@@ -217,6 +355,30 @@ object RelTimeIndicatorConverter {
 
   def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = {
     val converter = new RelTimeIndicatorConverter(rexBuilder)
-    rootRel.accept(converter)
+    val convertedRoot = rootRel.accept(converter)
+
+    var needsConversion = false
+
+    // materialize all remaining time indicators
+    val projects = convertedRoot.getRowType.getFieldList.map(field =>
+      if (isTimeIndicatorType(field.getType)) {
+        needsConversion = true
+        rexBuilder.makeCall(
+          TimeMaterializationSqlFunction,
+          new RexInputRef(field.getIndex, field.getType))
+      } else {
+        new RexInputRef(field.getIndex, field.getType)
+      }
+    )
+
+    // add final conversion
+    if (needsConversion) {
+      LogicalProject.create(
+      convertedRoot,
+      projects,
+      convertedRoot.getRowType.getFieldNames)
+    } else {
+      convertedRoot
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 25addbc..036889f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
@@ -238,6 +239,11 @@ class CodeGenerator(
   var outRecordTerm = "out"
 
   /**
+    * @return term of the [[ProcessFunction]]'s context
+    */
+  var contextTerm = "ctx"
+
+  /**
     * @return returns if null checking is enabled
     */
   def nullCheck: Boolean = config.getNullCheck
@@ -699,6 +705,17 @@ class CodeGenerator(
           List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
                s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
       }
+
+      // ProcessFunction
+      else if (clazz == classOf[ProcessFunction[_, _]]) {
+        val baseClass = classOf[ProcessFunction[_, _]]
+        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+        (baseClass,
+          s"void processElement(Object _in1, " +
+            s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
+            s"org.apache.flink.util.Collector $collectorTerm)",
+          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+      }
       else {
         // TODO more functions
         throw new CodeGenException("Unsupported Function.")
@@ -1312,9 +1329,11 @@ class CodeGenerator(
     throw new CodeGenException("Dynamic parameter references are not supported yet.")
 
   override def visitCall(call: RexCall): GeneratedExpression = {
-    // time materialization is not implemented yet
+    // special case: time materialization
     if (call.getOperator == TimeMaterializationSqlFunction) {
-      throw new CodeGenException("Access to time attributes is not possible yet.")
+      return generateRecordTimestamp(
+        FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType)
+      )
     }
 
     val operands = call.getOperands.map(_.accept(this))
@@ -1840,6 +1859,30 @@ class CodeGenerator(
     }
   }
 
+  private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = {
+    val resultTerm = newName("result")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+    val resultCode = if (isEventTime) {
+      s"""
+        |$resultTypeTerm $resultTerm;
+        |if ($contextTerm.timestamp() == null) {
+        |  throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " +
+        |    "TimestampAssigner is defined and the stream environment uses the EventTime time " +
+        |    "characteristic.");
+        |}
+        |else {
+        |  $resultTerm = $contextTerm.timestamp();
+        |}
+        |""".stripMargin
+    } else {
+      s"""
+        |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime();
+        |""".stripMargin
+    }
+    GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP)
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Reusable code snippets
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index e875587..9b486e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
@@ -30,16 +30,17 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait CommonCalc[T] {
+trait CommonCalc {
 
-  private[flink] def generateFunction(
+  private[flink] def generateFunction[T <: Function](
       generator: CodeGenerator,
       ruleDescription: String,
       inputSchema: RowSchema,
       returnSchema: RowSchema,
       calcProgram: RexProgram,
-      config: TableConfig):
-    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+      config: TableConfig,
+      functionClass: Class[T]):
+    GeneratedFunction[T, Row] = {
 
     val expandedExpressions = calcProgram
       .getProjectList
@@ -92,7 +93,7 @@ trait CommonCalc[T] {
 
     generator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
+      functionClass,
       body,
       returnSchema.physicalTypeInfo)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index c95f2f7..874bea2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
 import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
@@ -36,22 +36,22 @@ import scala.collection.JavaConverters._
 /**
   * Join a user-defined table function
   */
-trait CommonCorrelate[T] {
+trait CommonCorrelate {
 
   /**
     * Generates the flat map function to run the user-defined table function.
     */
-  private[flink] def generateFunction(
+  private[flink] def generateFunction[T <: Function](
     config: TableConfig,
     inputSchema: RowSchema,
     udtfTypeInfo: TypeInformation[Any],
     returnSchema: RowSchema,
-    rowType: RelDataType,
     joinType: SemiJoinType,
     rexCall: RexCall,
     pojoFieldMapping: Option[Array[Int]],
-    ruleDescription: String):
-  GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+    ruleDescription: String,
+    functionClass: Class[T]):
+  GeneratedFunction[T, Row] = {
 
     val functionGenerator = new CodeGenerator(
       config,
@@ -89,7 +89,7 @@ trait CommonCorrelate[T] {
       val outerResultExpr = functionGenerator.generateResultExpression(
         input1AccessExprs ++ input2NullExprs,
         returnSchema.physicalTypeInfo,
-        rowType.getFieldNames.asScala)
+        returnSchema.physicalFieldNames)
       body +=
         s"""
            |boolean hasOutput = $collectorTerm.isCollected();
@@ -104,7 +104,7 @@ trait CommonCorrelate[T] {
 
     functionGenerator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
+      functionClass,
       body,
       returnSchema.physicalTypeInfo)
   }
@@ -117,7 +117,6 @@ trait CommonCorrelate[T] {
     inputSchema: RowSchema,
     udtfTypeInfo: TypeInformation[Any],
     returnSchema: RowSchema,
-    rowType: RelDataType,
     condition: Option[RexNode],
     pojoFieldMapping: Option[Array[Int]])
   : GeneratedCollector = {
@@ -135,7 +134,7 @@ trait CommonCorrelate[T] {
     val crossResultExpr = generator.generateResultExpression(
       input1AccessExprs ++ input2AccessExprs,
       returnSchema.physicalTypeInfo,
-      rowType.getFieldNames.asScala)
+      returnSchema.physicalFieldNames)
 
     val collectorCode = if (condition.isEmpty) {
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e340a8c..9a9f738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
@@ -46,7 +47,7 @@ class DataSetCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc[Row]
+  with CommonCalc
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -95,7 +96,8 @@ class DataSetCalc(
       new RowSchema(getInput.getRowType),
       new RowSchema(getRowType),
       calcProgram,
-      config)
+      config,
+      classOf[FlatMapFunction[Row, Row]])
 
     val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 49ead26..731d2e5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -49,7 +50,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with CommonCorrelate[Row]
+  with CommonCorrelate
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -109,18 +110,17 @@ class DataSetCorrelate(
       new RowSchema(getInput.getRowType),
       udtfTypeInfo,
       new RowSchema(getRowType),
-      rowType,
       joinType,
       rexCall,
       pojoFieldMapping,
-      ruleDescription)
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]])
 
     val collector = generateCollector(
       config,
       new RowSchema(getInput.getRowType),
       udtfTypeInfo,
       new RowSchema(getRowType),
-      rowType,
       condition,
       pojoFieldMapping)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 5f270f6..f75efc8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -25,11 +25,12 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
-import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
@@ -45,7 +46,7 @@ class DataStreamCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc[CRow]
+  with CommonCalc
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = schema.logicalType
@@ -101,17 +102,18 @@ class DataStreamCalc(
       inputSchema,
       schema,
       calcProgram,
-      config)
+      config,
+      classOf[ProcessFunction[CRow, CRow]])
 
     val inputParallelism = inputDataStream.getParallelism
 
-    val mapFunc = new CRowFlatMapRunner(
+    val processFunc = new CRowProcessRunner(
       genFunction.name,
       genFunction.code,
       CRowTypeInfo(schema.physicalTypeInfo))
 
     inputDataStream
-      .flatMap(mapFunc)
+      .process(processFunc)
       .name(calcOpName(calcProgram, getExpressionString))
       // keep parallelism to ensure order of accumulate and retract messages
       .setParallelism(inputParallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 5b32b10..b7165cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -23,12 +23,13 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.CRowCorrelateProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
@@ -46,7 +47,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with CommonCorrelate[CRow]
+  with CommonCorrelate
   with DataStreamRel {
 
   override def deriveRowType() = schema.logicalType
@@ -90,7 +91,6 @@ class DataStreamCorrelate(
 
     // we do not need to specify input type
     val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
-    val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
@@ -98,37 +98,36 @@ class DataStreamCorrelate(
     val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val flatMap = generateFunction(
+    val process = generateFunction(
       config,
       inputSchema,
       udtfTypeInfo,
       schema,
-      getRowType,
       joinType,
       rexCall,
       pojoFieldMapping,
-      ruleDescription)
+      ruleDescription,
+      classOf[ProcessFunction[CRow, CRow]])
 
     val collector = generateCollector(
       config,
       inputSchema,
       udtfTypeInfo,
       schema,
-      getRowType,
       condition,
       pojoFieldMapping)
 
-    val mapFunc = new CRowCorrelateFlatMapRunner(
-      flatMap.name,
-      flatMap.code,
+    val processFunc = new CRowCorrelateProcessRunner(
+      process.name,
+      process.code,
       collector.name,
       collector.code,
-      CRowTypeInfo(flatMap.returnType))
+      CRowTypeInfo(process.returnType))
 
     val inputParallelism = inputDS.getParallelism
 
     inputDS
-      .flatMap(mapFunc)
+      .process(processFunc)
       // preserve input parallelism to ensure that acc and retract messages remain in order
       .setParallelism(inputParallelism)
       .name(correlateOpName(rexCall, sqlFunction, schema.logicalType))

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 51e609f..72ecac5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -49,7 +49,7 @@ class StreamTableSourceScan(
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -57,7 +57,7 @@ class StreamTableSourceScan(
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
index 0ca079e..ec90392 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -34,7 +34,7 @@ class FlinkLogicalCalc(
     calcProgram: RexProgram)
   extends Calc(cluster, traitSet, input, calcProgram)
   with FlinkLogicalRel
-  with CommonCalc[Any] {
+  with CommonCalc {
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
     new FlinkLogicalCalc(cluster, traitSet, child, program)

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index a2777ec..3ae949e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
 
 import scala.collection.JavaConverters._
 
@@ -54,7 +54,7 @@ class FlinkLogicalTableSourceScan(
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -62,7 +62,7 @@ class FlinkLogicalTableSourceScan(
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 28efcf5..d57d4cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.{TableException, Window}
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference}
 import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
@@ -68,10 +68,12 @@ class DataStreamLogicalWindowAggregateRule
         case _ => throw new TableException("Only constant window descriptors are supported.")
       }
 
-    def getOperandAsTimeIndicator(call: RexCall, idx: Int): String =
+    def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference =
       call.getOperands.get(idx) match {
         case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) =>
-          rowType.getFieldList.get(v.getIndex).getName
+          ResolvedFieldReference(
+            rowType.getFieldList.get(v.getIndex).getName,
+            FlinkTypeFactory.toTypeInfo(v.getType))
         case _ =>
           throw new TableException("Window can only be defined over a time attribute column.")
       }
@@ -82,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(UnresolvedFieldReference(time)).as("w$")
+        w.on(time).as("w$")
 
       case SqlStdOperatorTable.HOP =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
@@ -91,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(UnresolvedFieldReference(time)).as("w$")
+        w.on(time).as("w$")
 
       case SqlStdOperatorTable.SESSION =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(UnresolvedFieldReference(time)).as("w$")
+        w.on(time).as("w$")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
index b42be82..ccbe44d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -20,11 +20,12 @@ package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType}
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.TimeMaterializationSqlFunction
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
@@ -76,6 +77,14 @@ class RowSchema(private val logicalRowType: RelDataType) {
     override def visitInputRef(inputRef: RexInputRef): RexNode = {
       new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType)
     }
+
+    override def visitCall(call: RexCall): RexNode = call.getOperator match {
+      // we leave time indicators unchanged yet
+      // the index becomes invalid but right now we are only
+      // interested in the type of the input reference
+      case TimeMaterializationSqlFunction => call
+      case _ => super.visitCall(call)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index 75deca5..fa15288 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource}
 
 class StreamTableSourceTable[T](
     override val tableSource: TableSource[T],
@@ -39,7 +39,7 @@ class StreamTableSourceTable[T](
     val fieldCnt = fieldNames.length
 
     val rowtime = tableSource match {
-      case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
         Some((fieldCnt, rowtimeAttribute))
       case _ =>
@@ -47,7 +47,7 @@ class StreamTableSourceTable[T](
     }
 
     val proctime = tableSource match {
-      case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+      case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
         Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
deleted file mode 100644
index ff3821a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.functions.util.FunctionUtils
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
-  * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
-  */
-class CRowCorrelateFlatMapRunner(
-    flatMapName: String,
-    flatMapCode: String,
-    collectorName: String,
-    collectorCode: String,
-    @transient var returnType: TypeInformation[CRow])
-  extends RichFlatMapFunction[CRow, CRow]
-  with ResultTypeQueryable[CRow]
-  with Compiler[Any] {
-
-  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
-
-  private var function: FlatMapFunction[Row, Row] = _
-  private var collector: TableFunctionCollector[_] = _
-  private var cRowWrapper: CRowWrappingCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
-    LOG.debug("Instantiating TableFunctionCollector.")
-    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
-    this.cRowWrapper = new CRowWrappingCollector()
-
-    LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode")
-    val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode)
-    val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
-    LOG.debug("Instantiating FlatMapFunction.")
-    function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
-    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
-    FunctionUtils.openFunction(function, parameters)
-  }
-
-  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
-    cRowWrapper.out = out
-    cRowWrapper.setChange(in.change)
-
-    collector.setCollector(cRowWrapper)
-    collector.setInput(in.row)
-    collector.reset()
-
-    function.flatMap(in.row, cRowWrapper)
-  }
-
-  override def getProducedType: TypeInformation[CRow] = returnType
-
-  override def close(): Unit = {
-    FunctionUtils.closeFunction(function)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
new file mode 100644
index 0000000..4f0a785
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * A CorrelateProcessRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowCorrelateProcessRunner(
+    processName: String,
+    processCode: String,
+    collectorName: String,
+    collectorCode: String,
+    @transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[Any] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: ProcessFunction[Row, Row] = _
+  private var collector: TableFunctionCollector[_] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
+    LOG.debug("Instantiating TableFunctionCollector.")
+    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+    this.cRowWrapper = new CRowWrappingCollector()
+
+    LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
+    val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
+    val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
+    LOG.debug("Instantiating ProcessFunction.")
+    function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+  }
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow])
+    : Unit = {
+
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+
+    collector.setCollector(cRowWrapper)
+    collector.setInput(in.row)
+    collector.reset()
+
+    function.processElement(
+      in.row,
+      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
+      cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
deleted file mode 100644
index 9701cb9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.util.FunctionUtils
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.slf4j.LoggerFactory
-
-/**
-  * FlatMapRunner with [[CRow]] input and [[CRow]] output.
-  */
-class CRowFlatMapRunner(
-    name: String,
-    code: String,
-    @transient var returnType: TypeInformation[CRow])
-  extends RichFlatMapFunction[CRow, CRow]
-  with ResultTypeQueryable[CRow]
-  with Compiler[FlatMapFunction[Row, Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: FlatMapFunction[Row, Row] = _
-  private var cRowWrapper: CRowWrappingCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating FlatMapFunction.")
-    function = clazz.newInstance()
-    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
-    FunctionUtils.openFunction(function, parameters)
-
-    this.cRowWrapper = new CRowWrappingCollector()
-  }
-
-  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
-    cRowWrapper.out = out
-    cRowWrapper.setChange(in.change)
-    function.flatMap(in.row, cRowWrapper)
-  }
-
-  override def getProducedType: TypeInformation[CRow] = returnType
-
-  override def close(): Unit = {
-    FunctionUtils.closeFunction(function)
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
new file mode 100644
index 0000000..cef62a5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * ProcessRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowProcessRunner(
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[ProcessFunction[Row, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: ProcessFunction[Row, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating ProcessFunction.")
+    function = clazz.newInstance()
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+
+    this.cRowWrapper = new CRowWrappingCollector()
+  }
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow])
+    : Unit = {
+
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+    function.processElement(
+      in.row,
+      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
+      cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
deleted file mode 100644
index 6d87663..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-/**
-  * Defines a logical event-time attribute for a [[TableSource]].
-  * The event-time attribute can be used for indicating, accessing, and working with Flink's
-  * event-time.
-  *
-  * A [[TableSource]] that implements this interface defines the name of
-  * the event-time attribute. The attribute will be added to the schema of the
-  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
-  */
-trait DefinedRowTimeAttribute {
-
-  /**
-    * Defines a name of the event-time attribute that represents Flink's
-    * event-time. Null if no rowtime should be available.
-    *
-    * The field will be appended to the schema provided by the [[TableSource]].
-    */
-  def getRowtimeAttribute: String
-}
-
-/**
-  * Defines a logical processing-time attribute for a [[TableSource]].
-  * The processing-time attribute can be used for indicating, accessing, and working with Flink's
-  * processing-time.
-  *
-  * A [[TableSource]] that implements this interface defines the name of
-  * the processing-time attribute. The attribute will be added to the schema of the
-  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
-  */
-trait DefinedProcTimeAttribute {
-
-  /**
-    * Defines a name of the processing-time attribute that represents Flink's
-    * processing-time. Null if no rowtime should be available.
-    *
-    * The field will be appended to the schema provided by the [[TableSource]].
-    */
-  def getProctimeAttribute: String
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
new file mode 100644
index 0000000..d381115
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Defines a logical event-time attribute for a [[TableSource]].
+  * The event-time attribute can be used for indicating, accessing, and working with Flink's
+  * event-time.
+  *
+  * A [[TableSource]] that implements this interface defines the name of
+  * the event-time attribute. The attribute will be added to the schema of the
+  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+  */
+trait DefinedRowtimeAttribute {
+
+  /**
+    * Defines a name of the event-time attribute that represents Flink's
+    * event-time. Null if no rowtime should be available.
+    *
+    * The field will be appended to the schema provided by the [[TableSource]].
+    */
+  def getRowtimeAttribute: String
+}
+
+/**
+  * Defines a logical processing-time attribute for a [[TableSource]].
+  * The processing-time attribute can be used for indicating, accessing, and working with Flink's
+  * processing-time.
+  *
+  * A [[TableSource]] that implements this interface defines the name of
+  * the processing-time attribute. The attribute will be added to the schema of the
+  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+  */
+trait DefinedProctimeAttribute {
+
+  /**
+    * Defines a name of the processing-time attribute that represents Flink's
+    * processing-time. Null if no rowtime should be available.
+    *
+    * The field will be appended to the schema provided by the [[TableSource]].
+    */
+  def getProctimeAttribute: String
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
index e9384c7..7797f22 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -19,18 +19,16 @@
 package org.apache.flink.table.api.scala.stream
 
 import java.lang.{Integer => JInt, Long => JLong}
-import java.util.Collections
-import java.util.{List => JList}
 
 import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
@@ -151,7 +149,9 @@ class StreamTableEnvironmentTest extends TableTestBase {
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 
-    val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv]))
+    val jStreamExecEnv = mock(classOf[JStreamExecEnv])
+    when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+    val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index 18066c9..cda90f7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode}
 import org.apache.flink.types.Row
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
   @Test
   def testRowTimeTableSourceSimple(): Unit = {
     val util = streamTestUtil()
-    util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+    util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
 
     val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val")
 
@@ -43,7 +43,7 @@ class TableSourceTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
-        term("select", "addTime", "id", "name", "val")
+        term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }
@@ -51,7 +51,7 @@ class TableSourceTest extends TableTestBase {
   @Test
   def testRowTimeTableSourceGroupWindow(): Unit = {
     val util = streamTestUtil()
-    util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+    util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
 
     val t = util.tEnv.scan("rowTimeT")
       .filter("val > 100")
@@ -82,7 +82,7 @@ class TableSourceTest extends TableTestBase {
   @Test
   def testProcTimeTableSourceSimple(): Unit = {
     val util = streamTestUtil()
-    util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+    util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
 
     val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val")
 
@@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
-        term("select", "pTime", "id", "name", "val")
+        term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }
@@ -98,7 +98,7 @@ class TableSourceTest extends TableTestBase {
   @Test
   def testProcTimeTableSourceOverWindow(): Unit = {
     val util = streamTestUtil()
-    util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+    util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
 
     val t = util.tEnv.scan("procTimeT")
       .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
@@ -123,8 +123,8 @@ class TableSourceTest extends TableTestBase {
   }
 }
 
-class TestRowTimeSource(timeField: String)
-    extends StreamTableSource[Row] with DefinedRowTimeAttribute {
+class TestRowtimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedRowtimeAttribute {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
 
@@ -137,8 +137,8 @@ class TestRowTimeSource(timeField: String)
   }
 }
 
-class TestProcTimeSource(timeField: String)
-    extends StreamTableSource[Row] with DefinedProcTimeAttribute {
+class TestProctimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedProctimeAttribute {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???