You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 01:52:12 UTC

[3/4] flink git commit: [FLINK-6563] [table] Add builders with time attribute support for KafkaTableSources.

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 5c9a629..0cc9801 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
@@ -35,6 +37,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.util.Utf8;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -43,31 +46,63 @@ import java.util.Properties;
  * <p>The version-specific Kafka consumers need to extend this class and
  * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
-public abstract class KafkaAvroTableSource extends KafkaTableSource {
+public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
+
+	private final Class<? extends SpecificRecordBase> avroRecordClass;
+
+	private Map<String, String> fieldMapping;
 
 	/**
 	 * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param avroClass  Avro specific record.
+	 * @param topic            Kafka topic to consume.
+	 * @param properties       Properties for the Kafka consumer.
+	 * @param schema           Schema of the produced table.
+	 * @param avroRecordClass  Class of the Avro record that is read from the Kafka topic.
 	 */
-	KafkaAvroTableSource(
+	protected KafkaAvroTableSource(
 		String topic,
 		Properties properties,
-		Class<? extends SpecificRecordBase> avroClass) {
+		TableSchema schema,
+		Class<? extends SpecificRecordBase> avroRecordClass) {
 
 		super(
 			topic,
 			properties,
-			createDeserializationSchema(avroClass),
-			convertToRowTypeInformation(avroClass));
+			schema,
+			convertToRowTypeInformation(avroRecordClass));
+
+		this.avroRecordClass = avroRecordClass;
+	}
+
+	@Override
+	public Map<String, String> getFieldMapping() {
+		return fieldMapping;
+	}
+
+	@Override
+	public String explainSource() {
+		return "KafkaAvroTableSource(" + this.avroRecordClass.getSimpleName() + ")";
 	}
 
-	private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
-		return new AvroRowDeserializationSchema(record);
+	@Override
+	protected AvroRowDeserializationSchema getDeserializationSchema() {
+		return new AvroRowDeserializationSchema(avroRecordClass);
 	}
 
+	//////// SETTERS FOR OPTIONAL PARAMETERS
+
+	/**
+	 * Configures a field mapping for this TableSource.
+	 *
+	 * @param fieldMapping The field mapping.
+	 */
+	protected void setFieldMapping(Map<String, String> fieldMapping) {
+		this.fieldMapping = fieldMapping;
+	}
+
+	//////// HELPER METHODS
+
 	/**
 	 * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
 	 * Replaces generic Utf8 with basic String type information.
@@ -105,4 +140,61 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource {
 		}
 		return extracted;
 	}
+
+	/**
+	 * Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
+	 * KafkaAvroTableSource.
+	 *
+	 * @param <T> Type of the KafkaAvroTableSource produced by the builder.
+	 * @param <B> Type of the KafkaAvroTableSource.Builder subclass.
+	 */
+	protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder>
+		extends KafkaTableSource.Builder<T, B> {
+
+		private Class<? extends SpecificRecordBase> avroClass;
+
+		private Map<String, String> fieldMapping;
+
+		/**
+		 * Sets the class of the Avro records that aree read from the Kafka topic.
+		 *
+		 * @param avroClass The class of the Avro records that are read from the Kafka topic.
+		 * @return The builder.
+		 */
+		public B forAvroRecordClass(Class<? extends SpecificRecordBase> avroClass) {
+			this.avroClass = avroClass;
+			return builder();
+		}
+
+		/**
+		 * Sets a mapping from schema fields to fields of the produced Avro record.
+		 *
+		 * <p>A field mapping is required if the fields of produced tables should be named different than
+		 * the fields of the Avro record.
+		 * The key of the provided Map refers to the field of the table schema,
+		 * the value to the field of the Avro record.</p>
+		 *
+		 * @param schemaToAvroMapping A mapping from schema fields to Avro fields.
+		 * @return The builder.
+		 */
+		public B withTableToAvroMapping(Map<String, String> schemaToAvroMapping) {
+			this.fieldMapping = schemaToAvroMapping;
+			return builder();
+		}
+
+		/**
+		 * Returns the configured Avro class.
+		 *
+		 * @return The configured Avro class.
+		 */
+		protected Class<? extends SpecificRecordBase> getAvroRecordClass() {
+			return this.avroClass;
+		}
+
+		@Override
+		protected void configureTableSource(T source) {
+			super.configureTableSource(source);
+			source.setFieldMapping(this.fieldMapping);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 1c8e0a0..a91cc25 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
 
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -34,38 +36,159 @@ import java.util.Properties;
  *
  * <p>The field names are used to parse the JSON file and so are the types.
  */
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
+public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
+
+	private TableSchema jsonSchema;
+
+	private Map<String, String> fieldMapping;
+
+	private boolean failOnMissingField;
 
 	/**
 	 * Creates a generic Kafka JSON {@link StreamTableSource}.
 	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param typeInfo   Type information describing the result type. The field names are used
-	 *                   to parse the JSON file and so are the types.
+	 * @param topic       Kafka topic to consume.
+	 * @param properties  Properties for the Kafka consumer.
+	 * @param tableSchema The schema of the table.
+	 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 	 */
-	KafkaJsonTableSource(
-			String topic,
-			Properties properties,
-			TypeInformation<Row> typeInfo) {
+	protected KafkaJsonTableSource(
+		String topic,
+		Properties properties,
+		TableSchema tableSchema,
+		TableSchema jsonSchema) {
+
+		super(
+			topic,
+			properties,
+			tableSchema,
+			jsonSchemaToReturnType(jsonSchema));
+
+		this.jsonSchema = jsonSchema;
+	}
+
+	@Override
+	public Map<String, String> getFieldMapping() {
+		return fieldMapping;
+	}
+
+	@Override
+	protected JsonRowDeserializationSchema getDeserializationSchema() {
+		JsonRowDeserializationSchema deserSchema = new JsonRowDeserializationSchema(jsonSchemaToReturnType(jsonSchema));
+		deserSchema.setFailOnMissingField(failOnMissingField);
+		return deserSchema;
+	}
 
-		super(topic, properties, createDeserializationSchema(typeInfo), typeInfo);
+	@Override
+	public String explainSource() {
+		return "KafkaJSONTableSource";
 	}
 
+	//////// SETTERS FOR OPTIONAL PARAMETERS
+
 	/**
-	 * Configures the failure behaviour if a JSON field is missing.
+	 * Sets the flag that specifies the behavior in case of missing fields.
+	 * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
 	 *
-	 * <p>By default, a missing field is ignored and the field is set to null.
+	 * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+	 */
+	protected void setFailOnMissingField(boolean failOnMissingField) {
+		this.failOnMissingField = failOnMissingField;
+	}
+
+	/**
+	 * Sets the mapping from table schema fields to JSON schema fields.
 	 *
-	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 * @param fieldMapping The mapping from table schema fields to JSON schema fields.
 	 */
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
-		deserializationSchema.setFailOnMissingField(failOnMissingField);
+	protected void setFieldMapping(Map<String, String> fieldMapping) {
+		this.fieldMapping = fieldMapping;
+	}
+
+	//////// HELPER METHODS
+
+	/** Converts the JSON schema into into the return type. */
+	private static RowTypeInfo jsonSchemaToReturnType(TableSchema jsonSchema) {
+		return new RowTypeInfo(jsonSchema.getTypes(), jsonSchema.getColumnNames());
 	}
 
-	private static JsonRowDeserializationSchema createDeserializationSchema(TypeInformation<Row> typeInfo) {
+	/**
+	 * Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
+	 * KafkaJsonTableSource.
+	 *
+	 * @param <T> Type of the KafkaJsonTableSource produced by the builder.
+	 * @param <B> Type of the KafkaJsonTableSource.Builder subclass.
+	 */
+	protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder>
+		extends KafkaTableSource.Builder<T, B> {
+
+		private TableSchema jsonSchema;
+
+		private Map<String, String> fieldMapping;
+
+		private boolean failOnMissingField = false;
+
+		/**
+		 * Sets the schema of the JSON-encoded Kafka messages.
+		 * If not set, the JSON messages are decoded with the table schema.
+		 *
+		 * @param jsonSchema The schema of the JSON-encoded Kafka messages.
+		 * @return The builder.
+		 */
+		public B forJsonSchema(TableSchema jsonSchema) {
+			this.jsonSchema = jsonSchema;
+			return builder();
+		}
+
+		/**
+		 * Sets a mapping from schema fields to fields of the JSON schema.
+		 *
+		 * <p>A field mapping is required if the fields of produced tables should be named different than
+		 * the fields of the JSON records.
+		 * The key of the provided Map refers to the field of the table schema,
+		 * the value to the field in the JSON schema.</p>
+		 *
+		 * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
+		 * @return The builder.
+		 */
+		public B withTableToJsonMapping(Map<String, String> tableToJsonMapping) {
+			this.fieldMapping = tableToJsonMapping;
+			return builder();
+		}
+
+		/**
+		 * Sets flag whether to fail if a field is missing or not.
+		 *
+		 * @param failOnMissingField If set to true, the TableSource fails if a missing fields.
+		 *                           If set to false, a missing field is set to null.
+		 * @return The builder.
+		 */
+		public B failOnMissingField(boolean failOnMissingField) {
+			this.failOnMissingField = failOnMissingField;
+			return builder();
+		}
+
+		/**
+		 * Returns the configured JSON schema. If no JSON schema was configured, the table schema
+		 * is returned.
+		 *
+		 * @return The JSON schema for the TableSource.
+		 */
+		protected TableSchema getJsonSchema() {
+			if (jsonSchema != null) {
+				return this.jsonSchema;
+			} else {
+				return getTableSchema();
+			}
+		}
 
-		return new JsonRowDeserializationSchema(typeInfo);
+		@Override
+		protected void configureTableSource(T source) {
+			super.configureTableSource(source);
+			// configure field mapping
+			source.setFieldMapping(this.fieldMapping);
+			// configure missing field behavior
+			source.setFailOnMissingField(this.failOnMissingField);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index a9cf235..0bd04e4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -23,19 +23,35 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
+import scala.Option;
+
 /**
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
  * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
-public abstract class KafkaTableSource implements StreamTableSource<Row> {
+public abstract class KafkaTableSource
+	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+
+	/** The schema of the table. */
+	private final TableSchema schema;
 
 	/** The Kafka topic to consume. */
 	private final String topic;
@@ -43,30 +59,33 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 	/** Properties for the Kafka consumer. */
 	private final Properties properties;
 
-	/** Deserialization schema to use for Kafka records. */
-	private final DeserializationSchema<Row> deserializationSchema;
-
 	/** Type information describing the result type. */
-	private final TypeInformation<Row> typeInfo;
+	private TypeInformation<Row> returnType;
+
+	/** Field name of the processing time attribute, null if no processing time field is defined. */
+	private String proctimeAttribute;
+
+	/** Descriptor for a rowtime attribute. */
+	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
 
 	/**
 	 * Creates a generic Kafka {@link StreamTableSource}.
 	 *
 	 * @param topic                 Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param typeInfo              Type information describing the result type.
+	 * @param schema                Schema of the produced table.
+	 * @param returnType            Type information of the produced physical DataStream.
 	 */
-	KafkaTableSource(
+	protected KafkaTableSource(
 			String topic,
 			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			TypeInformation<Row> typeInfo) {
+			TableSchema schema,
+			TypeInformation<Row> returnType) {
 
-		this.topic = Preconditions.checkNotNull(topic, "Topic");
-		this.properties = Preconditions.checkNotNull(properties, "Properties");
-		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-		this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
+		this.returnType = Preconditions.checkNotNull(returnType, "Type information must not be null.");
 	}
 
 	/**
@@ -75,6 +94,8 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 	 */
 	@Override
 	public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+
+		DeserializationSchema<Row> deserializationSchema = getDeserializationSchema();
 		// Version-specific Kafka consumer
 		FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
 		return env.addSource(kafkaConsumer);
@@ -82,14 +103,65 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 
 	@Override
 	public TypeInformation<Row> getReturnType() {
-		return typeInfo;
+		return returnType;
 	}
 
 	@Override
 	public TableSchema getTableSchema() {
-		return TableSchema.fromTypeInfo(typeInfo);
+		return schema;
+	}
+
+	@Override
+	public String getProctimeAttribute() {
+		return proctimeAttribute;
+	}
+
+	@Override
+	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
+		return rowtimeAttributeDescriptors;
 	}
 
+	//////// SETTERS FOR OPTIONAL PARAMETERS
+
+	/**
+	 * Declares a field of the schema to be the processing time attribute.
+	 *
+	 * @param proctimeAttribute The name of the field that becomes the processing time field.
+	 */
+	protected void setProctimeAttribute(String proctimeAttribute) {
+		if (proctimeAttribute != null) {
+			// validate that field exists and is of correct type
+			Option<TypeInformation<?>> tpe = schema.getType(proctimeAttribute);
+			if (tpe.isEmpty()) {
+				throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not present in TableSchema.");
+			} else if (tpe.get() != Types.SQL_TIMESTAMP()) {
+				throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not of type SQL_TIMESTAMP.");
+			}
+		}
+		this.proctimeAttribute = proctimeAttribute;
+	}
+
+	/**
+	 * Declares a list of fields to be rowtime attributes.
+	 *
+	 * @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
+	 */
+	protected void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
+		// validate that all declared fields exist and are of correct type
+		for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
+			String rowtimeAttribute = desc.getAttributeName();
+			Option<TypeInformation<?>> tpe = schema.getType(rowtimeAttribute);
+			if (tpe.isEmpty()) {
+				throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not present in TableSchema.");
+			} else if (tpe.get() != Types.SQL_TIMESTAMP()) {
+				throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not of type SQL_TIMESTAMP.");
+			}
+		}
+		this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
+	}
+
+	//////// ABSTRACT METHODS FOR SUBCLASSES
+
 	/**
 	 * Returns the version-specific Kafka consumer.
 	 *
@@ -108,12 +180,195 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
 	 *
 	 * @return The deserialization schema
 	 */
-	protected DeserializationSchema<Row> getDeserializationSchema() {
-		return deserializationSchema;
-	}
+	protected abstract DeserializationSchema<Row> getDeserializationSchema();
 
-	@Override
-	public String explainSource() {
-		return "";
+	/**
+	 * Abstract builder for a {@link KafkaTableSource} to be extended by builders of subclasses of
+	 * KafkaTableSource.
+	 *
+	 * @param <T> Type of the KafkaTableSource produced by the builder.
+	 * @param <B> Type of the KafkaTableSource.Builder subclass.
+	 */
+	protected abstract static class Builder<T extends KafkaTableSource, B extends KafkaTableSource.Builder> {
+
+		private String topic;
+
+		private Properties kafkaProps;
+
+		private TableSchema schema;
+
+		private String proctimeAttribute;
+
+		private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
+
+		/**
+		 * Sets the topic from which the table is read.
+		 *
+		 * @param topic The topic from which the table is read.
+		 * @return The builder.
+		 */
+		public B forTopic(String topic) {
+			Preconditions.checkNotNull(topic, "Topic must not be null.");
+			Preconditions.checkArgument(this.topic == null, "Topic has already been set.");
+			this.topic = topic;
+			return builder();
+		}
+
+		/**
+		 * Sets the configuration properties for the Kafka consumer.
+		 *
+		 * @param props The configuration properties for the Kafka consumer.
+		 * @return The builder.
+		 */
+		public B withKafkaProperties(Properties props) {
+			Preconditions.checkNotNull(props, "Properties must not be null.");
+			Preconditions.checkArgument(this.kafkaProps == null, "Properties have already been set.");
+			this.kafkaProps = props;
+			return builder();
+		}
+
+		/**
+		 * Sets the schema of the produced table.
+		 *
+		 * @param schema The schema of the produced table.
+		 * @return The builder.
+		 */
+		public B withSchema(TableSchema schema) {
+			Preconditions.checkNotNull(schema, "Schema must not be null.");
+			Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
+			this.schema = schema;
+			return builder();
+		}
+
+		/**
+		 * Configures a field of the table to be a processing time attribute.
+		 * The configured field must be present in the tabel schema and of type {@link Types#SQL_TIMESTAMP()}.
+		 *
+		 * @param proctimeAttribute The name of the processing time attribute in the table schema.
+		 * @return The builder.
+		 */
+		public B withProctimeAttribute(String proctimeAttribute) {
+			Preconditions.checkNotNull(proctimeAttribute, "Proctime attribute must not be null.");
+			Preconditions.checkArgument(!proctimeAttribute.isEmpty(), "Proctime attribute must not be empty.");
+			Preconditions.checkArgument(this.proctimeAttribute == null, "Proctime attribute has already been set.");
+			this.proctimeAttribute = proctimeAttribute;
+			return builder();
+		}
+
+		/**
+		 * Configures a field of the table to be a rowtime attribute.
+		 * The configured field must be present in the tabel schema and of type {@link Types#SQL_TIMESTAMP()}.
+		 *
+		 * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
+		 * @param timestampExtractor The {@link TimestampExtractor} to extract the rowtime attribute from the physical type.
+		 * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
+		 * @return The builder.
+		 */
+		public B withRowtimeAttribute(
+				String rowtimeAttribute,
+				TimestampExtractor timestampExtractor,
+				WatermarkStrategy watermarkStrategy) {
+			Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+			Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
+			Preconditions.checkNotNull(timestampExtractor, "Timestamp extractor must not be null.");
+			Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
+			Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
+				"Currently, only one rowtime attribute is supported.");
+
+			this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
+				rowtimeAttribute,
+				timestampExtractor,
+				watermarkStrategy);
+			return builder();
+		}
+
+		/**
+		 * Configures the Kafka timestamp to be a rowtime attribute.
+		 *
+		 * <p>Note: Kafka supports message timestamps only since version 0.10.</p>
+		 *
+		 * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
+		 * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
+		 * @return The builder.
+		 */
+		public B withKafkaTimestampAsRowtimeAttribute(
+				String rowtimeAttribute,
+				WatermarkStrategy watermarkStrategy) {
+
+			Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+			Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
+			Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
+			Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
+				"Currently, only one rowtime attribute is supported.");
+			Preconditions.checkArgument(supportsKafkaTimestamps(), "Kafka timestamps are only supported since Kafka 0.10.");
+
+			this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
+				rowtimeAttribute,
+				new StreamRecordTimestamp(),
+				watermarkStrategy);
+			return builder();
+		}
+
+		/**
+		 * Returns the configured topic.
+		 *
+		 * @return the configured topic.
+		 */
+		protected String getTopic() {
+			return this.topic;
+		}
+
+		/**
+		 * Returns the configured Kafka properties.
+		 *
+		 * @return the configured Kafka properties.
+		 */
+		protected Properties getKafkaProps() {
+			return this.kafkaProps;
+		}
+
+		/**
+		 * Returns the configured table schema.
+		 *
+		 * @return the configured table schema.
+		 */
+		protected TableSchema getTableSchema() {
+			return this.schema;
+		}
+
+		/**
+		 * True if the KafkaSource supports Kafka timestamps, false otherwise.
+		 *
+		 * @return True if the KafkaSource supports Kafka timestamps, false otherwise.
+		 */
+		protected abstract boolean supportsKafkaTimestamps();
+
+		/**
+		 * Configures a TableSource with optional parameters.
+		 *
+		 * @param tableSource The TableSource to configure.
+		 */
+		protected void configureTableSource(T tableSource) {
+			// configure processing time attributes
+			tableSource.setProctimeAttribute(proctimeAttribute);
+			// configure rowtime attributes
+			if (rowtimeAttributeDescriptor == null) {
+				tableSource.setRowtimeAttributeDescriptors(Collections.emptyList());
+			} else {
+				tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+			}
+		}
+
+		/**
+		 * Returns the builder.
+		 * @return the builder.
+		 */
+		protected abstract B builder();
+
+		/**
+		 * Builds the configured {@link KafkaTableSource}.
+		 * @return The configured {@link KafkaTableSource}.
+		 */
+		protected abstract KafkaTableSource build();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
new file mode 100644
index 0000000..def16b2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.table.api.Types;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Abstract test base for all Kafka Avro table sources.
+ */
+public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestBase {
+
+	@Override
+	protected void configureBuilder(KafkaTableSource.Builder builder) {
+		super.configureBuilder(builder);
+		((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class);
+	}
+
+	@Test
+	public void testSameFieldsAvroClass() {
+		KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
+		this.configureBuilder(b);
+
+		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
+
+		// check return type
+		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+		assertNotNull(returnType);
+		assertEquals(5, returnType.getArity());
+		// check field names
+		assertEquals("field1", returnType.getFieldNames()[0]);
+		assertEquals("field2", returnType.getFieldNames()[1]);
+		assertEquals("time1", returnType.getFieldNames()[2]);
+		assertEquals("time2", returnType.getFieldNames()[3]);
+		assertEquals("field3", returnType.getFieldNames()[4]);
+		// check field types
+		assertEquals(Types.LONG(), returnType.getTypeAt(0));
+		assertEquals(Types.STRING(), returnType.getTypeAt(1));
+		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
+
+		// check field mapping
+		assertNull(source.getFieldMapping());
+	}
+
+	@Test
+	public void testDifferentFieldsAvroClass() {
+		KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
+		super.configureBuilder(b);
+		b.withProctimeAttribute("time2");
+
+		Map<String, String> mapping = new HashMap<>();
+		mapping.put("field1", "otherField1");
+		mapping.put("field2", "otherField2");
+		mapping.put("field3", "otherField3");
+
+		// set Avro class with different fields
+		b.forAvroRecordClass(DifferentFieldsAvroClass.class);
+		b.withTableToAvroMapping(mapping);
+
+		KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
+
+		// check return type
+		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+		assertNotNull(returnType);
+		assertEquals(6, returnType.getArity());
+		// check field names
+		assertEquals("otherField1", returnType.getFieldNames()[0]);
+		assertEquals("otherField2", returnType.getFieldNames()[1]);
+		assertEquals("otherTime1", returnType.getFieldNames()[2]);
+		assertEquals("otherField3", returnType.getFieldNames()[3]);
+		assertEquals("otherField4", returnType.getFieldNames()[4]);
+		assertEquals("otherField5", returnType.getFieldNames()[5]);
+		// check field types
+		assertEquals(Types.LONG(), returnType.getTypeAt(0));
+		assertEquals(Types.STRING(), returnType.getTypeAt(1));
+		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
+		assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+		assertEquals(Types.INT(), returnType.getTypeAt(5));
+
+		// check field mapping
+		Map<String, String> fieldMapping = source.getFieldMapping();
+		assertNotNull(fieldMapping);
+		assertEquals(3, fieldMapping.size());
+		assertEquals("otherField1", fieldMapping.get("field1"));
+		assertEquals("otherField2", fieldMapping.get("field2"));
+		assertEquals("otherField3", fieldMapping.get("field3"));
+	}
+
+	/**
+	 * Avro record that matches the table schema.
+	 */
+	@SuppressWarnings("unused")
+	public static class SameFieldsAvroClass extends SpecificRecordBase {
+
+		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
+		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
+		//CHECKSTYLE.ON: StaticVariableNameCheck
+
+		public Long field1;
+		public String field2;
+		public Timestamp time1;
+		public Timestamp time2;
+		public Double field3;
+
+		@Override
+		public Schema getSchema() {
+			return null;
+		}
+
+		@Override
+		public Object get(int field) {
+			return null;
+		}
+
+		@Override
+		public void put(int field, Object value) { }
+	}
+
+	/**
+	 * Avro record that does NOT match the table schema.
+	 */
+	@SuppressWarnings("unused")
+	public static class DifferentFieldsAvroClass extends SpecificRecordBase {
+
+		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
+		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(
+			new String[]{"otherField1", "otherField2", "otherTime1", "otherField3", "otherField4", "otherField5"},
+			new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.BYTE(), Types.INT()});
+		//CHECKSTYLE.ON: StaticVariableNameCheck
+
+		public Long otherField1;
+		public String otherField2;
+		public Timestamp otherTime1;
+		public Double otherField3;
+		public Byte otherField4;
+		public Integer otherField5;
+
+		@Override
+		public Schema getSchema() {
+			return null;
+		}
+
+		@Override
+		public Object get(int field) {
+			return null;
+		}
+
+		@Override
+		public void put(int field, Object value) { }
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
new file mode 100644
index 0000000..9b867f6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Abstract test base for all Kafka JSON table sources.
+ */
+public abstract class KafkaJsonTableSourceTestBase extends KafkaTableSourceTestBase {
+
+	@Test
+	public void testJsonEqualsTableSchema() {
+		KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
+		this.configureBuilder(b);
+
+		KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
+
+		// check return type
+		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+		assertNotNull(returnType);
+		assertEquals(5, returnType.getArity());
+		// check field names
+		assertEquals("field1", returnType.getFieldNames()[0]);
+		assertEquals("field2", returnType.getFieldNames()[1]);
+		assertEquals("time1", returnType.getFieldNames()[2]);
+		assertEquals("time2", returnType.getFieldNames()[3]);
+		assertEquals("field3", returnType.getFieldNames()[4]);
+		// check field types
+		assertEquals(Types.LONG(), returnType.getTypeAt(0));
+		assertEquals(Types.STRING(), returnType.getTypeAt(1));
+		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+		assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+		assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
+
+		// check field mapping
+		assertNull(source.getFieldMapping());
+	}
+
+	@Test
+	public void testCustomJsonSchemaWithMapping() {
+		KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
+		super.configureBuilder(b);
+		b.withProctimeAttribute("time2");
+
+		Map<String, String> mapping = new HashMap<>();
+		mapping.put("field1", "otherField1");
+		mapping.put("field2", "otherField2");
+		mapping.put("field3", "otherField3");
+
+		// set Avro class with different fields
+		b.forJsonSchema(TableSchema.builder()
+			.field("otherField1", Types.LONG())
+			.field("otherField2", Types.STRING())
+			.field("rowtime", Types.LONG())
+			.field("otherField3", Types.DOUBLE())
+			.field("otherField4", Types.BYTE())
+			.field("otherField5", Types.INT()).build());
+		b.withTableToJsonMapping(mapping);
+		b.withRowtimeAttribute("time1", new ExistingField("timeField1"), new AscendingTimestamps());
+
+		KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
+
+		// check return type
+		RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+		assertNotNull(returnType);
+		assertEquals(6, returnType.getArity());
+		// check field names
+		assertEquals("otherField1", returnType.getFieldNames()[0]);
+		assertEquals("otherField2", returnType.getFieldNames()[1]);
+		assertEquals("rowtime", returnType.getFieldNames()[2]);
+		assertEquals("otherField3", returnType.getFieldNames()[3]);
+		assertEquals("otherField4", returnType.getFieldNames()[4]);
+		assertEquals("otherField5", returnType.getFieldNames()[5]);
+		// check field types
+		assertEquals(Types.LONG(), returnType.getTypeAt(0));
+		assertEquals(Types.STRING(), returnType.getTypeAt(1));
+		assertEquals(Types.LONG(), returnType.getTypeAt(2));
+		assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
+		assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+		assertEquals(Types.INT(), returnType.getTypeAt(5));
+
+		// check field mapping
+		Map<String, String> fieldMapping = source.getFieldMapping();
+		assertNotNull(fieldMapping);
+		assertEquals(3, fieldMapping.size());
+		assertEquals("otherField1", fieldMapping.get("field1"));
+		assertEquals("otherField2", fieldMapping.get("field2"));
+		assertEquals("otherField3", fieldMapping.get("field3"));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 8028bfc..218401c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -18,20 +18,27 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 import org.apache.flink.types.Row;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -43,69 +50,155 @@ import static org.mockito.Mockito.verify;
  */
 public abstract class KafkaTableSourceTestBase {
 
-	private static final String TOPIC = "testTopic";
-	private static final String[] FIELD_NAMES = new String[] { "mylong", "mystring", "myboolean", "mydouble", "missingField" };
-	private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
-		BasicTypeInfo.LONG_TYPE_INFO,
-		BasicTypeInfo.STRING_TYPE_INFO,
-		BasicTypeInfo.BOOLEAN_TYPE_INFO,
-		BasicTypeInfo.DOUBLE_TYPE_INFO,
-		BasicTypeInfo.LONG_TYPE_INFO };
-	private static final Properties PROPERTIES = createSourceProperties();
-
-	/**
-	 * Avro record that matches above schema.
-	 */
-	public static class AvroSpecificRecord extends SpecificRecordBase {
-
-		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
-		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
-		//CHECKSTYLE.ON: StaticVariableNameCheck
-
-		public Long mylong;
-		public String mystring;
-		public Boolean myboolean;
-		public Double mydouble;
-		public Long missingField;
-
-		@Override
-		public Schema getSchema() {
-			return null;
-		}
-
-		@Override
-		public Object get(int field) {
-			return null;
-		}
+	static final String[] FIELD_NAMES =
+		new String[]{"field1", "field2", "time1", "time2", "field3"};
+	static final TypeInformation[] FIELD_TYPES =
+		new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.DOUBLE()};
 
-		@Override
-		public void put(int field, Object value) {
-
-		}
-	}
+	private static final String TOPIC = "testTopic";
+	private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
+	private static final Properties PROPS = createSourceProperties();
 
 	@Test
-	public void testKafkaTableSource() {
-		KafkaTableSource kafkaTableSource = spy(createTableSource());
+	public void testKafkaConsumer() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		// assert that correct
+		KafkaTableSource observed = spy(b.build());
 		StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
-		kafkaTableSource.getDataStream(env);
+		observed.getDataStream(env);
 
 		verify(env).addSource(any(getFlinkKafkaConsumer()));
 
-		verify(kafkaTableSource).getKafkaConsumer(
+		verify(observed).getKafkaConsumer(
 			eq(TOPIC),
-			eq(PROPERTIES),
+			eq(PROPS),
 			any(getDeserializationSchema()));
 	}
 
-	protected abstract KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
+	@Test
+	public void testTableSchema() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		KafkaTableSource source = b.build();
+
+		// check table schema
+		TableSchema schema = source.getTableSchema();
+		assertNotNull(schema);
+		assertEquals(5, schema.getColumnNames().length);
+		// check table fields
+		assertEquals("field1", schema.getColumnNames()[0]);
+		assertEquals("field2", schema.getColumnNames()[1]);
+		assertEquals("time1", schema.getColumnNames()[2]);
+		assertEquals("time2", schema.getColumnNames()[3]);
+		assertEquals("field3", schema.getColumnNames()[4]);
+		assertEquals(Types.LONG(), schema.getTypes()[0]);
+		assertEquals(Types.STRING(), schema.getTypes()[1]);
+		assertEquals(Types.SQL_TIMESTAMP(), schema.getTypes()[2]);
+		assertEquals(Types.SQL_TIMESTAMP(), schema.getTypes()[3]);
+		assertEquals(Types.DOUBLE(), schema.getTypes()[4]);
+	}
+
+	@Test
+	public void testNoTimeAttributes() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		KafkaTableSource source = b.build();
+
+		// assert no proctime
+		assertNull(source.getProctimeAttribute());
+		// assert no rowtime
+		assertNotNull(source.getRowtimeAttributeDescriptors());
+		assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
+	}
+
+	@Test
+	public void testProctimeAttribute() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+		b.withProctimeAttribute("time1");
+
+		KafkaTableSource source = b.build();
+
+		// assert correct proctime field
+		assertEquals(source.getProctimeAttribute(), "time1");
+
+		// assert no rowtime
+		assertNotNull(source.getRowtimeAttributeDescriptors());
+		assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
+	}
+
+	@Test
+	public void testRowtimeAttribute() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+		b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
+
+		KafkaTableSource source = b.build();
+
+		// assert no proctime
+		assertNull(source.getProctimeAttribute());
+
+		// assert correct rowtime descriptor
+		List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
+		assertNotNull(descs);
+		assertEquals(1, descs.size());
+		RowtimeAttributeDescriptor desc = descs.get(0);
+		assertEquals("time2", desc.getAttributeName());
+		// assert timestamp extractor
+		assertTrue(desc.getTimestampExtractor() instanceof ExistingField);
+		assertEquals(1, desc.getTimestampExtractor().getArgumentFields().length);
+		assertEquals("time2", desc.getTimestampExtractor().getArgumentFields()[0]);
+		// assert watermark strategy
+		assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
+	}
+
+	@Test
+	public void testKafkaTSRowtimeAttribute() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		try {
+			b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
+
+			KafkaTableSource source = b.build();
+
+			// assert no proctime
+			assertNull(source.getProctimeAttribute());
+
+			// assert correct rowtime descriptor
+			List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
+			assertNotNull(descs);
+			assertEquals(1, descs.size());
+			RowtimeAttributeDescriptor desc = descs.get(0);
+			assertEquals("time2", desc.getAttributeName());
+			// assert timestamp extractor
+			assertTrue(desc.getTimestampExtractor() instanceof StreamRecordTimestamp);
+			assertTrue(desc.getTimestampExtractor().getArgumentFields().length == 0);
+			// assert watermark strategy
+			assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
+		} catch (Exception e) {
+			if (b.supportsKafkaTimestamps()) {
+				// builder should support Kafka timestamps
+				fail();
+			}
+		}
+	}
+
+	protected abstract KafkaTableSource.Builder getBuilder();
 
 	protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
 
 	protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
 
-	private KafkaTableSource createTableSource() {
-		return createTableSource(TOPIC, PROPERTIES, Types.ROW(FIELD_NAMES, FIELD_TYPES));
+	protected void configureBuilder(KafkaTableSource.Builder builder) {
+		builder
+			.forTopic(TOPIC)
+			.withKafkaProperties(PROPS)
+			.withSchema(SCHEMA);
 	}
 
 	private static Properties createSourceProperties() {
@@ -114,4 +207,5 @@ public abstract class KafkaTableSourceTestBase {
 		properties.setProperty("group.id", "dummy");
 		return properties;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index f7ef710..f4d928f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -20,6 +20,8 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 
+import _root_.scala.collection.mutable.ArrayBuffer
+
 /**
   * A TableSchema represents a Table's structure.
   */
@@ -154,4 +156,24 @@ object TableSchema {
     }
   }
 
+  def builder(): TableSchemaBuilder = {
+    new TableSchemaBuilder
+  }
+
+}
+
+class TableSchemaBuilder {
+
+  private val fieldNames: ArrayBuffer[String] = new ArrayBuffer[String]()
+  private val fieldTypes: ArrayBuffer[TypeInformation[_]] = new ArrayBuffer[TypeInformation[_]]()
+
+  def field(name: String, tpe: TypeInformation[_]): TableSchemaBuilder = {
+    fieldNames.append(name)
+    fieldTypes.append(tpe)
+    this
+  }
+
+  def build(): TableSchema = {
+    new TableSchema(fieldNames.toArray, fieldTypes.toArray)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index be7f84f..a794b08 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -33,13 +33,13 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions}
+import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
@@ -241,9 +241,9 @@ abstract class CodeGenerator(
     *
     * @param returnType conversion target type. Inputs and output must have the same arity.
     * @param resultFieldNames result field names necessary for a mapping to POJO fields.
-    * @param rowtimeExpression an optional expression to extract the value of a rowtime field from
-    *                          the input data. If not set, the value of rowtime field is set to the
-    *                          StreamRecord timestamp.
+    * @param rowtimeExpression an expression to extract the value of a rowtime field from
+    *                          the input data. Required if the field indicies include a rowtime
+    *                          marker.
     * @return instance of GeneratedExpression
     */
   def generateConverterResultExpression(
@@ -254,16 +254,12 @@ abstract class CodeGenerator(
 
     val input1AccessExprs = input1Mapping.map {
       case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+           TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER if rowtimeExpression.isDefined =>
+          // generate rowtime attribute from expression
+          generateExpression(rowtimeExpression.get)
+      case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
            TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
-        // attribute is a rowtime indicator.
-        rowtimeExpression match {
-          case Some(expr) =>
-            // generate rowtime attribute from expression
-            generateExpression(expr)
-          case _ =>
-            // fetch rowtime attribute from StreamRecord timestamp field
-            generateRowtimeAccess()
-        }
+          throw TableException("Rowtime extraction expression missing. Please report a bug.")
       case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
         // attribute is proctime indicator.
         // we use a null literal and generate a timestamp when we need it.
@@ -845,7 +841,7 @@ abstract class CodeGenerator(
         val operand = operands.head
         requireTimeInterval(operand)
         generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
-        
+
       // comparison
       case EQUALS =>
         val left = operands.head
@@ -994,6 +990,9 @@ abstract class CodeGenerator(
       case ScalarSqlFunctions.CONCAT_WS =>
         generateConcatWs(operands)
 
+      case StreamRecordTimestampSqlFunction =>
+        generateStreamRecordRowtimeAccess()
+
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
         val callGen = FunctionGenerator.getCallGenerator(
@@ -1298,7 +1297,7 @@ abstract class CodeGenerator(
     }
   }
 
-  private[flink] def generateRowtimeAccess(): GeneratedExpression = {
+  private[flink] def generateStreamRecordRowtimeAccess(): GeneratedExpression = {
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
 
@@ -1313,7 +1312,7 @@ abstract class CodeGenerator(
         |boolean $nullTerm = false;
        """.stripMargin
 
-    GeneratedExpression(resultTerm, nullTerm, accessCode, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+    GeneratedExpression(resultTerm, nullTerm, accessCode, Types.LONG)
   }
 
   private[flink] def generateProctimeTimestamp(): GeneratedExpression = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index bad5889..f3ef039 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.functions.sql.StreamRecordTimestampSqlFunction
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
@@ -163,9 +164,9 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
   override private[flink] def validateInput(): ValidationResult = {
     child match {
-      case WindowReference(_, Some(tpe)) if isProctimeIndicatorType(tpe) =>
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isProctimeIndicatorType(tpe) =>
         ValidationFailure("A proctime window cannot provide a rowtime attribute.")
-      case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
         // rowtime window
         ValidationSuccess
       case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
@@ -181,7 +182,7 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
   override def resultType: TypeInformation[_] = {
     child match {
-      case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
         // rowtime window
         TimeIndicatorTypeInfo.ROWTIME_INDICATOR
       case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
@@ -203,7 +204,7 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
   override private[flink] def validateInput(): ValidationResult = {
     child match {
-      case WindowReference(_, Some(tpe)) if isTimeIndicatorType(tpe) =>
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isTimeIndicatorType(tpe) =>
         ValidationSuccess
       case WindowReference(_, _) =>
         ValidationFailure("Reference to a rowtime or proctime window required.")
@@ -221,3 +222,13 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
   override def toString: String = s"proctime($child)"
 }
+
+/** Expression to access the timestamp of a StreamRecord. */
+case class StreamRecordTimestamp() extends LeafExpression {
+
+  override private[flink] def resultType = Types.LONG
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.getRexBuilder.makeCall(StreamRecordTimestampSqlFunction)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
new file mode 100644
index 0000000..10be75c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.sql
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+
+/**
+  * Function to access the timestamp of a StreamRecord.
+  */
+object StreamRecordTimestampSqlFunction extends SqlFunction(
+  "STREAMRECORD_TIMESTAMP",
+  SqlKind.OTHER_FUNCTION,
+  ReturnTypes.explicit(SqlTypeName.BIGINT),
+  InferTypes.RETURN_TYPE,
+  OperandTypes.family(SqlTypeFamily.NUMERIC),
+  SqlFunctionCategory.SYSTEM) {
+
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def isDeterministic: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 59b3120..f4bd3b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -22,11 +22,14 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.expressions.Cast
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.schema.DataStreamTable
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /**
   * Flink RelNode which matches along with DataStreamSource.
@@ -61,7 +64,22 @@ class DataStreamScan(
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     val fieldIdxs = dataStreamTable.fieldIndexes
-    convertToInternalRow(schema, inputDataStream, fieldIdxs, config, None)
+
+    // get expression to extract timestamp
+    val rowtimeExpr: Option[RexNode] =
+      if (fieldIdxs.contains(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER)) {
+        // extract timestamp from StreamRecord
+        Some(
+          Cast(
+            org.apache.flink.table.expressions.StreamRecordTimestamp(),
+            TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+            .toRexNode(tableEnv.getRelBuilder))
+      } else {
+        None
+      }
+
+    // convert DataStream
+    convertToInternalRow(schema, inputDataStream, fieldIdxs, config, rowtimeExpr)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 46acd6c..5d305b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources._
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
index 6a2ccc9..fcfc2ca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
@@ -22,6 +22,7 @@ import java.util.{Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
 
 /**
   * The [[DefinedFieldMapping]] interface provides a mapping for the fields of the table schema

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
index bfc06f9..f09baa3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -22,6 +22,8 @@ import java.util
 
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy
 
 /**
   * Extends a [[TableSource]] to specify a processing time attribute.

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
deleted file mode 100644
index e0f01d5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
-
-/**
-  * Provides the an expression to extract the timestamp for a rowtime attribute.
-  */
-abstract class TimestampExtractor extends FieldComputer[Long] {
-
-  /** Timestamp extractors compute the timestamp as Long. */
-  override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
-}
-
-/**
-  * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
-  *
-  * @param field The field to convert into a rowtime attribute.
-  */
-class ExistingField(field: String) extends TimestampExtractor {
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  @throws[ValidationException]
-  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
-
-    // get type of field to convert
-    val fieldType = physicalFieldTypes(0)
-
-    // check that the field to convert is of type Long or Timestamp
-    fieldType match {
-      case Types.LONG => // OK
-      case Types.SQL_TIMESTAMP => // OK
-      case _: TypeInformation[_] =>
-        throw ValidationException(
-          s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
-    }
-  }
-
-  /**
-    * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
-    * rowtime attribute.
-    */
-  def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-
-    val fieldAccess: Expression = fieldAccesses(0)
-
-    fieldAccess.resultType match {
-      case Types.LONG =>
-        // access LONG field
-        fieldAccess
-      case Types.SQL_TIMESTAMP =>
-        // cast timestamp to long
-        Cast(fieldAccess, Types.LONG)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
new file mode 100644
index 0000000..12cd564
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
+
+/**
+  * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
+  *
+  * @param field The field to convert into a rowtime attribute.
+  */
+class ExistingField(field: String) extends TimestampExtractor {
+
+  override def getArgumentFields: Array[String] = Array(field)
+
+  @throws[ValidationException]
+  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
+
+    // get type of field to convert
+    val fieldType = physicalFieldTypes(0)
+
+    // check that the field to convert is of type Long or Timestamp
+    fieldType match {
+      case Types.LONG => // OK
+      case Types.SQL_TIMESTAMP => // OK
+      case _: TypeInformation[_] =>
+        throw ValidationException(
+          s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
+    }
+  }
+
+  /**
+    * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
+    * rowtime attribute.
+    */
+  def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+
+    val fieldAccess: Expression = fieldAccesses(0)
+
+    fieldAccess.resultType match {
+      case Types.LONG =>
+        // access LONG field
+        fieldAccess
+      case Types.SQL_TIMESTAMP =>
+        // cast timestamp to long
+        Cast(fieldAccess, Types.LONG)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
new file mode 100644
index 0000000..329f790
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference}
+
+/**
+  * Extracts the timestamp of a StreamRecord into a rowtime attribute.
+  *
+  * Note: This extractor only works for StreamTableSources.
+  */
+class StreamRecordTimestamp extends TimestampExtractor {
+
+  /** No argument fields required. */
+  override def getArgumentFields: Array[String] = Array()
+
+  /** No validation required. */
+  @throws[ValidationException]
+  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = { }
+
+  /**
+    * Returns an [[Expression]] that extracts the timestamp of a StreamRecord.
+    */
+  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+    org.apache.flink.table.expressions.StreamRecordTimestamp()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
new file mode 100644
index 0000000..34c6ba5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.FieldComputer
+
+/**
+  * Provides the an expression to extract the timestamp for a rowtime attribute.
+  */
+abstract class TimestampExtractor extends FieldComputer[Long] {
+
+  /** Timestamp extractors compute the timestamp as Long. */
+  override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
deleted file mode 100644
index eec423f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.types.Row
-
-/**
-  * Provides a strategy to generate watermarks for a rowtime attribute.
-  *
-  * A watermark strategy is either a [[PeriodicWatermarkAssigner]] or
-  * [[PunctuatedWatermarkAssigner]].
-  *
-  */
-sealed abstract class WatermarkStrategy extends Serializable
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Updates the assigner with the next timestamp.
-    *
-    * @param timestamp The next timestamp to update the assigner.
-    */
-  def nextTimestamp(timestamp: Long): Unit
-
-  /**
-    * Returns the current watermark.
-    *
-    * @return The current watermark.
-    */
-  def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Returns the watermark for the current row or null if no watermark should be generated.
-    *
-    * @param row The current row.
-    * @param timestamp The value of the timestamp attribute for the row.
-    * @return The watermark for this row or null if no watermark should be generated.
-    */
-  def getWatermark(row: Row, timestamp: Long): Watermark
-}
-
-/**
-  * A watermark assigner for ascending rowtime attributes.
-  *
-  * Emits a watermark of the maximum observed timestamp so far minus 1.
-  * Rows that have a timestamp equal to the max timestamp are not late.
-  */
-class AscendingWatermarks extends PeriodicWatermarkAssigner {
-
-  var maxTimestamp: Long = Long.MinValue + 1
-
-  override def nextTimestamp(timestamp: Long): Unit = {
-    if (timestamp > maxTimestamp) {
-      maxTimestamp = timestamp
-    }
-  }
-
-  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-}
-
-/**
-  * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
-  *
-  * Emits watermarks which are the maximum observed timestamp minus the specified delay.
-  *
-  * @param delay The delay by which watermarks are behind the maximum observed timestamp.
-  */
-class BoundedOutOfOrderWatermarks(val delay: Long) extends PeriodicWatermarkAssigner {
-
-  var maxTimestamp: Long = Long.MinValue + delay
-
-  override def nextTimestamp(timestamp: Long): Unit = {
-    if (timestamp > maxTimestamp) {
-      maxTimestamp = timestamp
-    }
-  }
-
-  override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
new file mode 100644
index 0000000..3a947ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A watermark assigner for ascending rowtime attributes.
+  *
+  * Emits a watermark of the maximum observed timestamp so far minus 1.
+  * Rows that have a timestamp equal to the max timestamp are not late.
+  */
+class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+  var maxTimestamp: Long = Long.MinValue + 1
+
+  override def nextTimestamp(timestamp: Long): Unit = {
+    if (timestamp > maxTimestamp) {
+      maxTimestamp = timestamp
+    }
+  }
+
+  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
new file mode 100644
index 0000000..957daca
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
+  *
+  * Emits watermarks which are the maximum observed timestamp minus the specified delay.
+  *
+  * @param delay The delay by which watermarks are behind the maximum observed timestamp.
+  */
+class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
+
+  var maxTimestamp: Long = Long.MinValue + delay
+
+  override def nextTimestamp(timestamp: Long): Unit = {
+    if (timestamp > maxTimestamp) {
+      maxTimestamp = timestamp
+    }
+  }
+
+  override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
+}