You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/01 01:52:12 UTC
[3/4] flink git commit: [FLINK-6563] [table] Add builders with time
attribute support for KafkaTableSources.
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 5c9a629..0cc9801 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -35,6 +37,7 @@ import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -43,31 +46,63 @@ import java.util.Properties;
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
-public abstract class KafkaAvroTableSource extends KafkaTableSource {
+public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
+
+ private final Class<? extends SpecificRecordBase> avroRecordClass;
+
+ private Map<String, String> fieldMapping;
/**
* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param avroClass Avro specific record.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param schema Schema of the produced table.
+ * @param avroRecordClass Class of the Avro record that is read from the Kafka topic.
*/
- KafkaAvroTableSource(
+ protected KafkaAvroTableSource(
String topic,
Properties properties,
- Class<? extends SpecificRecordBase> avroClass) {
+ TableSchema schema,
+ Class<? extends SpecificRecordBase> avroRecordClass) {
super(
topic,
properties,
- createDeserializationSchema(avroClass),
- convertToRowTypeInformation(avroClass));
+ schema,
+ convertToRowTypeInformation(avroRecordClass));
+
+ this.avroRecordClass = avroRecordClass;
+ }
+
+ @Override
+ public Map<String, String> getFieldMapping() {
+ return fieldMapping;
+ }
+
+ @Override
+ public String explainSource() {
+ return "KafkaAvroTableSource(" + this.avroRecordClass.getSimpleName() + ")";
}
- private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
- return new AvroRowDeserializationSchema(record);
+ @Override
+ protected AvroRowDeserializationSchema getDeserializationSchema() {
+ return new AvroRowDeserializationSchema(avroRecordClass);
}
+ //////// SETTERS FOR OPTIONAL PARAMETERS
+
+ /**
+ * Configures a field mapping for this TableSource.
+ *
+ * @param fieldMapping The field mapping.
+ */
+ protected void setFieldMapping(Map<String, String> fieldMapping) {
+ this.fieldMapping = fieldMapping;
+ }
+
+ //////// HELPER METHODS
+
/**
* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order.
* Replaces generic Utf8 with basic String type information.
@@ -105,4 +140,61 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource {
}
return extracted;
}
+
+ /**
+ * Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
+ * KafkaAvroTableSource.
+ *
+ * @param <T> Type of the KafkaAvroTableSource produced by the builder.
+ * @param <B> Type of the KafkaAvroTableSource.Builder subclass.
+ */
+ protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder>
+ extends KafkaTableSource.Builder<T, B> {
+
+ private Class<? extends SpecificRecordBase> avroClass;
+
+ private Map<String, String> fieldMapping;
+
+ /**
+ * Sets the class of the Avro records that aree read from the Kafka topic.
+ *
+ * @param avroClass The class of the Avro records that are read from the Kafka topic.
+ * @return The builder.
+ */
+ public B forAvroRecordClass(Class<? extends SpecificRecordBase> avroClass) {
+ this.avroClass = avroClass;
+ return builder();
+ }
+
+ /**
+ * Sets a mapping from schema fields to fields of the produced Avro record.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the Avro record.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field of the Avro record.</p>
+ *
+ * @param schemaToAvroMapping A mapping from schema fields to Avro fields.
+ * @return The builder.
+ */
+ public B withTableToAvroMapping(Map<String, String> schemaToAvroMapping) {
+ this.fieldMapping = schemaToAvroMapping;
+ return builder();
+ }
+
+ /**
+ * Returns the configured Avro class.
+ *
+ * @return The configured Avro class.
+ */
+ protected Class<? extends SpecificRecordBase> getAvroRecordClass() {
+ return this.avroClass;
+ }
+
+ @Override
+ protected void configureTableSource(T source) {
+ super.configureTableSource(source);
+ source.setFieldMapping(this.fieldMapping);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 1c8e0a0..a91cc25 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -18,12 +18,14 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
+import java.util.Map;
import java.util.Properties;
/**
@@ -34,38 +36,159 @@ import java.util.Properties;
*
* <p>The field names are used to parse the JSON file and so are the types.
*/
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
+public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
+
+ private TableSchema jsonSchema;
+
+ private Map<String, String> fieldMapping;
+
+ private boolean failOnMissingField;
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
*
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param tableSchema The schema of the table.
+ * @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
- KafkaJsonTableSource(
- String topic,
- Properties properties,
- TypeInformation<Row> typeInfo) {
+ protected KafkaJsonTableSource(
+ String topic,
+ Properties properties,
+ TableSchema tableSchema,
+ TableSchema jsonSchema) {
+
+ super(
+ topic,
+ properties,
+ tableSchema,
+ jsonSchemaToReturnType(jsonSchema));
+
+ this.jsonSchema = jsonSchema;
+ }
+
+ @Override
+ public Map<String, String> getFieldMapping() {
+ return fieldMapping;
+ }
+
+ @Override
+ protected JsonRowDeserializationSchema getDeserializationSchema() {
+ JsonRowDeserializationSchema deserSchema = new JsonRowDeserializationSchema(jsonSchemaToReturnType(jsonSchema));
+ deserSchema.setFailOnMissingField(failOnMissingField);
+ return deserSchema;
+ }
- super(topic, properties, createDeserializationSchema(typeInfo), typeInfo);
+ @Override
+ public String explainSource() {
+ return "KafkaJSONTableSource";
}
+ //////// SETTERS FOR OPTIONAL PARAMETERS
+
/**
- * Configures the failure behaviour if a JSON field is missing.
+ * Sets the flag that specifies the behavior in case of missing fields.
+ * TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
- * <p>By default, a missing field is ignored and the field is set to null.
+ * @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ */
+ protected void setFailOnMissingField(boolean failOnMissingField) {
+ this.failOnMissingField = failOnMissingField;
+ }
+
+ /**
+ * Sets the mapping from table schema fields to JSON schema fields.
*
- * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+ * @param fieldMapping The mapping from table schema fields to JSON schema fields.
*/
- public void setFailOnMissingField(boolean failOnMissingField) {
- JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
- deserializationSchema.setFailOnMissingField(failOnMissingField);
+ protected void setFieldMapping(Map<String, String> fieldMapping) {
+ this.fieldMapping = fieldMapping;
+ }
+
+ //////// HELPER METHODS
+
+ /** Converts the JSON schema into into the return type. */
+ private static RowTypeInfo jsonSchemaToReturnType(TableSchema jsonSchema) {
+ return new RowTypeInfo(jsonSchema.getTypes(), jsonSchema.getColumnNames());
}
- private static JsonRowDeserializationSchema createDeserializationSchema(TypeInformation<Row> typeInfo) {
+ /**
+ * Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
+ * KafkaJsonTableSource.
+ *
+ * @param <T> Type of the KafkaJsonTableSource produced by the builder.
+ * @param <B> Type of the KafkaJsonTableSource.Builder subclass.
+ */
+ protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder>
+ extends KafkaTableSource.Builder<T, B> {
+
+ private TableSchema jsonSchema;
+
+ private Map<String, String> fieldMapping;
+
+ private boolean failOnMissingField = false;
+
+ /**
+ * Sets the schema of the JSON-encoded Kafka messages.
+ * If not set, the JSON messages are decoded with the table schema.
+ *
+ * @param jsonSchema The schema of the JSON-encoded Kafka messages.
+ * @return The builder.
+ */
+ public B forJsonSchema(TableSchema jsonSchema) {
+ this.jsonSchema = jsonSchema;
+ return builder();
+ }
+
+ /**
+ * Sets a mapping from schema fields to fields of the JSON schema.
+ *
+ * <p>A field mapping is required if the fields of produced tables should be named different than
+ * the fields of the JSON records.
+ * The key of the provided Map refers to the field of the table schema,
+ * the value to the field in the JSON schema.</p>
+ *
+ * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
+ * @return The builder.
+ */
+ public B withTableToJsonMapping(Map<String, String> tableToJsonMapping) {
+ this.fieldMapping = tableToJsonMapping;
+ return builder();
+ }
+
+ /**
+ * Sets flag whether to fail if a field is missing or not.
+ *
+ * @param failOnMissingField If set to true, the TableSource fails if a missing fields.
+ * If set to false, a missing field is set to null.
+ * @return The builder.
+ */
+ public B failOnMissingField(boolean failOnMissingField) {
+ this.failOnMissingField = failOnMissingField;
+ return builder();
+ }
+
+ /**
+ * Returns the configured JSON schema. If no JSON schema was configured, the table schema
+ * is returned.
+ *
+ * @return The JSON schema for the TableSource.
+ */
+ protected TableSchema getJsonSchema() {
+ if (jsonSchema != null) {
+ return this.jsonSchema;
+ } else {
+ return getTableSchema();
+ }
+ }
- return new JsonRowDeserializationSchema(typeInfo);
+ @Override
+ protected void configureTableSource(T source) {
+ super.configureTableSource(source);
+ // configure field mapping
+ source.setFieldMapping(this.fieldMapping);
+ // configure missing field behavior
+ source.setFailOnMissingField(this.failOnMissingField);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index a9cf235..0bd04e4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -23,19 +23,35 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.DefinedProctimeAttribute;
+import org.apache.flink.table.sources.DefinedRowtimeAttributes;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
+import scala.Option;
+
/**
* A version-agnostic Kafka {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
-public abstract class KafkaTableSource implements StreamTableSource<Row> {
+public abstract class KafkaTableSource
+ implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+
+ /** The schema of the table. */
+ private final TableSchema schema;
/** The Kafka topic to consume. */
private final String topic;
@@ -43,30 +59,33 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
/** Properties for the Kafka consumer. */
private final Properties properties;
- /** Deserialization schema to use for Kafka records. */
- private final DeserializationSchema<Row> deserializationSchema;
-
/** Type information describing the result type. */
- private final TypeInformation<Row> typeInfo;
+ private TypeInformation<Row> returnType;
+
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private String proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param typeInfo Type information describing the result type.
+ * @param schema Schema of the produced table.
+ * @param returnType Type information of the produced physical DataStream.
*/
- KafkaTableSource(
+ protected KafkaTableSource(
String topic,
Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- TypeInformation<Row> typeInfo) {
+ TableSchema schema,
+ TypeInformation<Row> returnType) {
- this.topic = Preconditions.checkNotNull(topic, "Topic");
- this.properties = Preconditions.checkNotNull(properties, "Properties");
- this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
- this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information");
+ this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+ this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+ this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
+ this.returnType = Preconditions.checkNotNull(returnType, "Type information must not be null.");
}
/**
@@ -75,6 +94,8 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
*/
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+
+ DeserializationSchema<Row> deserializationSchema = getDeserializationSchema();
// Version-specific Kafka consumer
FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
return env.addSource(kafkaConsumer);
@@ -82,14 +103,65 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
@Override
public TypeInformation<Row> getReturnType() {
- return typeInfo;
+ return returnType;
}
@Override
public TableSchema getTableSchema() {
- return TableSchema.fromTypeInfo(typeInfo);
+ return schema;
+ }
+
+ @Override
+ public String getProctimeAttribute() {
+ return proctimeAttribute;
+ }
+
+ @Override
+ public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
+ return rowtimeAttributeDescriptors;
}
+ //////// SETTERS FOR OPTIONAL PARAMETERS
+
+ /**
+ * Declares a field of the schema to be the processing time attribute.
+ *
+ * @param proctimeAttribute The name of the field that becomes the processing time field.
+ */
+ protected void setProctimeAttribute(String proctimeAttribute) {
+ if (proctimeAttribute != null) {
+ // validate that field exists and is of correct type
+ Option<TypeInformation<?>> tpe = schema.getType(proctimeAttribute);
+ if (tpe.isEmpty()) {
+ throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not present in TableSchema.");
+ } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
+ throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not of type SQL_TIMESTAMP.");
+ }
+ }
+ this.proctimeAttribute = proctimeAttribute;
+ }
+
+ /**
+ * Declares a list of fields to be rowtime attributes.
+ *
+ * @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
+ */
+ protected void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
+ // validate that all declared fields exist and are of correct type
+ for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
+ String rowtimeAttribute = desc.getAttributeName();
+ Option<TypeInformation<?>> tpe = schema.getType(rowtimeAttribute);
+ if (tpe.isEmpty()) {
+ throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not present in TableSchema.");
+ } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
+ throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not of type SQL_TIMESTAMP.");
+ }
+ }
+ this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
+ }
+
+ //////// ABSTRACT METHODS FOR SUBCLASSES
+
/**
* Returns the version-specific Kafka consumer.
*
@@ -108,12 +180,195 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> {
*
* @return The deserialization schema
*/
- protected DeserializationSchema<Row> getDeserializationSchema() {
- return deserializationSchema;
- }
+ protected abstract DeserializationSchema<Row> getDeserializationSchema();
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Abstract builder for a {@link KafkaTableSource} to be extended by builders of subclasses of
+ * KafkaTableSource.
+ *
+ * @param <T> Type of the KafkaTableSource produced by the builder.
+ * @param <B> Type of the KafkaTableSource.Builder subclass.
+ */
+ protected abstract static class Builder<T extends KafkaTableSource, B extends KafkaTableSource.Builder> {
+
+ private String topic;
+
+ private Properties kafkaProps;
+
+ private TableSchema schema;
+
+ private String proctimeAttribute;
+
+ private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
+
+ /**
+ * Sets the topic from which the table is read.
+ *
+ * @param topic The topic from which the table is read.
+ * @return The builder.
+ */
+ public B forTopic(String topic) {
+ Preconditions.checkNotNull(topic, "Topic must not be null.");
+ Preconditions.checkArgument(this.topic == null, "Topic has already been set.");
+ this.topic = topic;
+ return builder();
+ }
+
+ /**
+ * Sets the configuration properties for the Kafka consumer.
+ *
+ * @param props The configuration properties for the Kafka consumer.
+ * @return The builder.
+ */
+ public B withKafkaProperties(Properties props) {
+ Preconditions.checkNotNull(props, "Properties must not be null.");
+ Preconditions.checkArgument(this.kafkaProps == null, "Properties have already been set.");
+ this.kafkaProps = props;
+ return builder();
+ }
+
+ /**
+ * Sets the schema of the produced table.
+ *
+ * @param schema The schema of the produced table.
+ * @return The builder.
+ */
+ public B withSchema(TableSchema schema) {
+ Preconditions.checkNotNull(schema, "Schema must not be null.");
+ Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
+ this.schema = schema;
+ return builder();
+ }
+
+ /**
+ * Configures a field of the table to be a processing time attribute.
+ * The configured field must be present in the tabel schema and of type {@link Types#SQL_TIMESTAMP()}.
+ *
+ * @param proctimeAttribute The name of the processing time attribute in the table schema.
+ * @return The builder.
+ */
+ public B withProctimeAttribute(String proctimeAttribute) {
+ Preconditions.checkNotNull(proctimeAttribute, "Proctime attribute must not be null.");
+ Preconditions.checkArgument(!proctimeAttribute.isEmpty(), "Proctime attribute must not be empty.");
+ Preconditions.checkArgument(this.proctimeAttribute == null, "Proctime attribute has already been set.");
+ this.proctimeAttribute = proctimeAttribute;
+ return builder();
+ }
+
+ /**
+ * Configures a field of the table to be a rowtime attribute.
+ * The configured field must be present in the tabel schema and of type {@link Types#SQL_TIMESTAMP()}.
+ *
+ * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
+ * @param timestampExtractor The {@link TimestampExtractor} to extract the rowtime attribute from the physical type.
+ * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
+ * @return The builder.
+ */
+ public B withRowtimeAttribute(
+ String rowtimeAttribute,
+ TimestampExtractor timestampExtractor,
+ WatermarkStrategy watermarkStrategy) {
+ Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+ Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
+ Preconditions.checkNotNull(timestampExtractor, "Timestamp extractor must not be null.");
+ Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
+ Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
+ "Currently, only one rowtime attribute is supported.");
+
+ this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
+ rowtimeAttribute,
+ timestampExtractor,
+ watermarkStrategy);
+ return builder();
+ }
+
+ /**
+ * Configures the Kafka timestamp to be a rowtime attribute.
+ *
+ * <p>Note: Kafka supports message timestamps only since version 0.10.</p>
+ *
+ * @param rowtimeAttribute The name of the rowtime attribute in the table schema.
+ * @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
+ * @return The builder.
+ */
+ public B withKafkaTimestampAsRowtimeAttribute(
+ String rowtimeAttribute,
+ WatermarkStrategy watermarkStrategy) {
+
+ Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
+ Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
+ Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
+ Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
+ "Currently, only one rowtime attribute is supported.");
+ Preconditions.checkArgument(supportsKafkaTimestamps(), "Kafka timestamps are only supported since Kafka 0.10.");
+
+ this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
+ rowtimeAttribute,
+ new StreamRecordTimestamp(),
+ watermarkStrategy);
+ return builder();
+ }
+
+ /**
+ * Returns the configured topic.
+ *
+ * @return the configured topic.
+ */
+ protected String getTopic() {
+ return this.topic;
+ }
+
+ /**
+ * Returns the configured Kafka properties.
+ *
+ * @return the configured Kafka properties.
+ */
+ protected Properties getKafkaProps() {
+ return this.kafkaProps;
+ }
+
+ /**
+ * Returns the configured table schema.
+ *
+ * @return the configured table schema.
+ */
+ protected TableSchema getTableSchema() {
+ return this.schema;
+ }
+
+ /**
+ * True if the KafkaSource supports Kafka timestamps, false otherwise.
+ *
+ * @return True if the KafkaSource supports Kafka timestamps, false otherwise.
+ */
+ protected abstract boolean supportsKafkaTimestamps();
+
+ /**
+ * Configures a TableSource with optional parameters.
+ *
+ * @param tableSource The TableSource to configure.
+ */
+ protected void configureTableSource(T tableSource) {
+ // configure processing time attributes
+ tableSource.setProctimeAttribute(proctimeAttribute);
+ // configure rowtime attributes
+ if (rowtimeAttributeDescriptor == null) {
+ tableSource.setRowtimeAttributeDescriptors(Collections.emptyList());
+ } else {
+ tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
+ }
+ }
+
+ /**
+ * Returns the builder.
+ * @return the builder.
+ */
+ protected abstract B builder();
+
+ /**
+ * Builds the configured {@link KafkaTableSource}.
+ * @return The configured {@link KafkaTableSource}.
+ */
+ protected abstract KafkaTableSource build();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
new file mode 100644
index 0000000..def16b2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.table.api.Types;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Abstract test base for all Kafka Avro table sources.
+ */
+public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceTestBase {
+
+ @Override
+ protected void configureBuilder(KafkaTableSource.Builder builder) {
+ super.configureBuilder(builder);
+ ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class);
+ }
+
+ @Test
+ public void testSameFieldsAvroClass() {
+ KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
+ this.configureBuilder(b);
+
+ KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
+
+ // check return type
+ RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+ assertNotNull(returnType);
+ assertEquals(5, returnType.getArity());
+ // check field names
+ assertEquals("field1", returnType.getFieldNames()[0]);
+ assertEquals("field2", returnType.getFieldNames()[1]);
+ assertEquals("time1", returnType.getFieldNames()[2]);
+ assertEquals("time2", returnType.getFieldNames()[3]);
+ assertEquals("field3", returnType.getFieldNames()[4]);
+ // check field types
+ assertEquals(Types.LONG(), returnType.getTypeAt(0));
+ assertEquals(Types.STRING(), returnType.getTypeAt(1));
+ assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+ assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+ assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
+
+ // check field mapping
+ assertNull(source.getFieldMapping());
+ }
+
+ @Test
+ public void testDifferentFieldsAvroClass() {
+ KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
+ super.configureBuilder(b);
+ b.withProctimeAttribute("time2");
+
+ Map<String, String> mapping = new HashMap<>();
+ mapping.put("field1", "otherField1");
+ mapping.put("field2", "otherField2");
+ mapping.put("field3", "otherField3");
+
+ // set Avro class with different fields
+ b.forAvroRecordClass(DifferentFieldsAvroClass.class);
+ b.withTableToAvroMapping(mapping);
+
+ KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
+
+ // check return type
+ RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+ assertNotNull(returnType);
+ assertEquals(6, returnType.getArity());
+ // check field names
+ assertEquals("otherField1", returnType.getFieldNames()[0]);
+ assertEquals("otherField2", returnType.getFieldNames()[1]);
+ assertEquals("otherTime1", returnType.getFieldNames()[2]);
+ assertEquals("otherField3", returnType.getFieldNames()[3]);
+ assertEquals("otherField4", returnType.getFieldNames()[4]);
+ assertEquals("otherField5", returnType.getFieldNames()[5]);
+ // check field types
+ assertEquals(Types.LONG(), returnType.getTypeAt(0));
+ assertEquals(Types.STRING(), returnType.getTypeAt(1));
+ assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+ assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
+ assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+ assertEquals(Types.INT(), returnType.getTypeAt(5));
+
+ // check field mapping
+ Map<String, String> fieldMapping = source.getFieldMapping();
+ assertNotNull(fieldMapping);
+ assertEquals(3, fieldMapping.size());
+ assertEquals("otherField1", fieldMapping.get("field1"));
+ assertEquals("otherField2", fieldMapping.get("field2"));
+ assertEquals("otherField3", fieldMapping.get("field3"));
+ }
+
+ /**
+ * Avro record that matches the table schema.
+ */
+ @SuppressWarnings("unused")
+ public static class SameFieldsAvroClass extends SpecificRecordBase {
+
+ //CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
+ public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
+ //CHECKSTYLE.ON: StaticVariableNameCheck
+
+ public Long field1;
+ public String field2;
+ public Timestamp time1;
+ public Timestamp time2;
+ public Double field3;
+
+ @Override
+ public Schema getSchema() {
+ return null;
+ }
+
+ @Override
+ public Object get(int field) {
+ return null;
+ }
+
+ @Override
+ public void put(int field, Object value) { }
+ }
+
+ /**
+ * Avro record that does NOT match the table schema.
+ */
+ @SuppressWarnings("unused")
+ public static class DifferentFieldsAvroClass extends SpecificRecordBase {
+
+ //CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
+ public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(
+ new String[]{"otherField1", "otherField2", "otherTime1", "otherField3", "otherField4", "otherField5"},
+ new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.BYTE(), Types.INT()});
+ //CHECKSTYLE.ON: StaticVariableNameCheck
+
+ public Long otherField1;
+ public String otherField2;
+ public Timestamp otherTime1;
+ public Double otherField3;
+ public Byte otherField4;
+ public Integer otherField5;
+
+ @Override
+ public Schema getSchema() {
+ return null;
+ }
+
+ @Override
+ public Object get(int field) {
+ return null;
+ }
+
+ @Override
+ public void put(int field, Object value) { }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
new file mode 100644
index 0000000..9b867f6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceTestBase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Abstract test base for all Kafka JSON table sources.
+ */
+public abstract class KafkaJsonTableSourceTestBase extends KafkaTableSourceTestBase {
+
+ @Test
+ public void testJsonEqualsTableSchema() {
+ KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
+ this.configureBuilder(b);
+
+ KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
+
+ // check return type
+ RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+ assertNotNull(returnType);
+ assertEquals(5, returnType.getArity());
+ // check field names
+ assertEquals("field1", returnType.getFieldNames()[0]);
+ assertEquals("field2", returnType.getFieldNames()[1]);
+ assertEquals("time1", returnType.getFieldNames()[2]);
+ assertEquals("time2", returnType.getFieldNames()[3]);
+ assertEquals("field3", returnType.getFieldNames()[4]);
+ // check field types
+ assertEquals(Types.LONG(), returnType.getTypeAt(0));
+ assertEquals(Types.STRING(), returnType.getTypeAt(1));
+ assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
+ assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
+ assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
+
+ // check field mapping
+ assertNull(source.getFieldMapping());
+ }
+
+ @Test
+ public void testCustomJsonSchemaWithMapping() {
+ KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
+ super.configureBuilder(b);
+ b.withProctimeAttribute("time2");
+
+ Map<String, String> mapping = new HashMap<>();
+ mapping.put("field1", "otherField1");
+ mapping.put("field2", "otherField2");
+ mapping.put("field3", "otherField3");
+
+ // set Avro class with different fields
+ b.forJsonSchema(TableSchema.builder()
+ .field("otherField1", Types.LONG())
+ .field("otherField2", Types.STRING())
+ .field("rowtime", Types.LONG())
+ .field("otherField3", Types.DOUBLE())
+ .field("otherField4", Types.BYTE())
+ .field("otherField5", Types.INT()).build());
+ b.withTableToJsonMapping(mapping);
+ b.withRowtimeAttribute("time1", new ExistingField("timeField1"), new AscendingTimestamps());
+
+ KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
+
+ // check return type
+ RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
+ assertNotNull(returnType);
+ assertEquals(6, returnType.getArity());
+ // check field names
+ assertEquals("otherField1", returnType.getFieldNames()[0]);
+ assertEquals("otherField2", returnType.getFieldNames()[1]);
+ assertEquals("rowtime", returnType.getFieldNames()[2]);
+ assertEquals("otherField3", returnType.getFieldNames()[3]);
+ assertEquals("otherField4", returnType.getFieldNames()[4]);
+ assertEquals("otherField5", returnType.getFieldNames()[5]);
+ // check field types
+ assertEquals(Types.LONG(), returnType.getTypeAt(0));
+ assertEquals(Types.STRING(), returnType.getTypeAt(1));
+ assertEquals(Types.LONG(), returnType.getTypeAt(2));
+ assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
+ assertEquals(Types.BYTE(), returnType.getTypeAt(4));
+ assertEquals(Types.INT(), returnType.getTypeAt(5));
+
+ // check field mapping
+ Map<String, String> fieldMapping = source.getFieldMapping();
+ assertNotNull(fieldMapping);
+ assertEquals(3, fieldMapping.size());
+ assertEquals("otherField1", fieldMapping.get("field1"));
+ assertEquals("otherField2", fieldMapping.get("field2"));
+ assertEquals("otherField3", fieldMapping.get("field3"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 8028bfc..218401c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -18,20 +18,27 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
import org.junit.Test;
+import java.util.List;
import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -43,69 +50,155 @@ import static org.mockito.Mockito.verify;
*/
public abstract class KafkaTableSourceTestBase {
- private static final String TOPIC = "testTopic";
- private static final String[] FIELD_NAMES = new String[] { "mylong", "mystring", "myboolean", "mydouble", "missingField" };
- private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO };
- private static final Properties PROPERTIES = createSourceProperties();
-
- /**
- * Avro record that matches above schema.
- */
- public static class AvroSpecificRecord extends SpecificRecordBase {
-
- //CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
- public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
- //CHECKSTYLE.ON: StaticVariableNameCheck
-
- public Long mylong;
- public String mystring;
- public Boolean myboolean;
- public Double mydouble;
- public Long missingField;
-
- @Override
- public Schema getSchema() {
- return null;
- }
-
- @Override
- public Object get(int field) {
- return null;
- }
+ static final String[] FIELD_NAMES =
+ new String[]{"field1", "field2", "time1", "time2", "field3"};
+ static final TypeInformation[] FIELD_TYPES =
+ new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.DOUBLE()};
- @Override
- public void put(int field, Object value) {
-
- }
- }
+ private static final String TOPIC = "testTopic";
+ private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
+ private static final Properties PROPS = createSourceProperties();
@Test
- public void testKafkaTableSource() {
- KafkaTableSource kafkaTableSource = spy(createTableSource());
+ public void testKafkaConsumer() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+
+ // assert that correct
+ KafkaTableSource observed = spy(b.build());
StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
- kafkaTableSource.getDataStream(env);
+ observed.getDataStream(env);
verify(env).addSource(any(getFlinkKafkaConsumer()));
- verify(kafkaTableSource).getKafkaConsumer(
+ verify(observed).getKafkaConsumer(
eq(TOPIC),
- eq(PROPERTIES),
+ eq(PROPS),
any(getDeserializationSchema()));
}
- protected abstract KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
+ @Test
+ public void testTableSchema() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+
+ KafkaTableSource source = b.build();
+
+ // check table schema
+ TableSchema schema = source.getTableSchema();
+ assertNotNull(schema);
+ assertEquals(5, schema.getColumnNames().length);
+ // check table fields
+ assertEquals("field1", schema.getColumnNames()[0]);
+ assertEquals("field2", schema.getColumnNames()[1]);
+ assertEquals("time1", schema.getColumnNames()[2]);
+ assertEquals("time2", schema.getColumnNames()[3]);
+ assertEquals("field3", schema.getColumnNames()[4]);
+ assertEquals(Types.LONG(), schema.getTypes()[0]);
+ assertEquals(Types.STRING(), schema.getTypes()[1]);
+ assertEquals(Types.SQL_TIMESTAMP(), schema.getTypes()[2]);
+ assertEquals(Types.SQL_TIMESTAMP(), schema.getTypes()[3]);
+ assertEquals(Types.DOUBLE(), schema.getTypes()[4]);
+ }
+
+ @Test
+ public void testNoTimeAttributes() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+
+ KafkaTableSource source = b.build();
+
+ // assert no proctime
+ assertNull(source.getProctimeAttribute());
+ // assert no rowtime
+ assertNotNull(source.getRowtimeAttributeDescriptors());
+ assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
+ }
+
+ @Test
+ public void testProctimeAttribute() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+ b.withProctimeAttribute("time1");
+
+ KafkaTableSource source = b.build();
+
+ // assert correct proctime field
+ assertEquals(source.getProctimeAttribute(), "time1");
+
+ // assert no rowtime
+ assertNotNull(source.getRowtimeAttributeDescriptors());
+ assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
+ }
+
+ @Test
+ public void testRowtimeAttribute() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+ b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
+
+ KafkaTableSource source = b.build();
+
+ // assert no proctime
+ assertNull(source.getProctimeAttribute());
+
+ // assert correct rowtime descriptor
+ List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
+ assertNotNull(descs);
+ assertEquals(1, descs.size());
+ RowtimeAttributeDescriptor desc = descs.get(0);
+ assertEquals("time2", desc.getAttributeName());
+ // assert timestamp extractor
+ assertTrue(desc.getTimestampExtractor() instanceof ExistingField);
+ assertEquals(1, desc.getTimestampExtractor().getArgumentFields().length);
+ assertEquals("time2", desc.getTimestampExtractor().getArgumentFields()[0]);
+ // assert watermark strategy
+ assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
+ }
+
+ @Test
+ public void testKafkaTSRowtimeAttribute() {
+ KafkaTableSource.Builder b = getBuilder();
+ configureBuilder(b);
+
+ try {
+ b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
+
+ KafkaTableSource source = b.build();
+
+ // assert no proctime
+ assertNull(source.getProctimeAttribute());
+
+ // assert correct rowtime descriptor
+ List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
+ assertNotNull(descs);
+ assertEquals(1, descs.size());
+ RowtimeAttributeDescriptor desc = descs.get(0);
+ assertEquals("time2", desc.getAttributeName());
+ // assert timestamp extractor
+ assertTrue(desc.getTimestampExtractor() instanceof StreamRecordTimestamp);
+ assertTrue(desc.getTimestampExtractor().getArgumentFields().length == 0);
+ // assert watermark strategy
+ assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
+ } catch (Exception e) {
+ if (b.supportsKafkaTimestamps()) {
+ // builder should support Kafka timestamps
+ fail();
+ }
+ }
+ }
+
+ protected abstract KafkaTableSource.Builder getBuilder();
protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
- private KafkaTableSource createTableSource() {
- return createTableSource(TOPIC, PROPERTIES, Types.ROW(FIELD_NAMES, FIELD_TYPES));
+ protected void configureBuilder(KafkaTableSource.Builder builder) {
+ builder
+ .forTopic(TOPIC)
+ .withKafkaProperties(PROPS)
+ .withSchema(SCHEMA);
}
private static Properties createSourceProperties() {
@@ -114,4 +207,5 @@ public abstract class KafkaTableSourceTestBase {
properties.setProperty("group.id", "dummy");
return properties;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index f7ef710..f4d928f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -20,6 +20,8 @@ package org.apache.flink.table.api
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
+import _root_.scala.collection.mutable.ArrayBuffer
+
/**
* A TableSchema represents a Table's structure.
*/
@@ -154,4 +156,24 @@ object TableSchema {
}
}
+ def builder(): TableSchemaBuilder = {
+ new TableSchemaBuilder
+ }
+
+}
+
+class TableSchemaBuilder {
+
+ private val fieldNames: ArrayBuffer[String] = new ArrayBuffer[String]()
+ private val fieldTypes: ArrayBuffer[TypeInformation[_]] = new ArrayBuffer[TypeInformation[_]]()
+
+ def field(name: String, tpe: TypeInformation[_]): TableSchemaBuilder = {
+ fieldNames.append(name)
+ fieldTypes.append(tpe)
+ this
+ }
+
+ def build(): TableSchema = {
+ new TableSchema(fieldNames.toArray, fieldTypes.toArray)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index be7f84f..a794b08 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -33,13 +33,13 @@ import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator}
import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions}
+import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
@@ -241,9 +241,9 @@ abstract class CodeGenerator(
*
* @param returnType conversion target type. Inputs and output must have the same arity.
* @param resultFieldNames result field names necessary for a mapping to POJO fields.
- * @param rowtimeExpression an optional expression to extract the value of a rowtime field from
- * the input data. If not set, the value of rowtime field is set to the
- * StreamRecord timestamp.
+ * @param rowtimeExpression an expression to extract the value of a rowtime field from
+ * the input data. Required if the field indicies include a rowtime
+ * marker.
* @return instance of GeneratedExpression
*/
def generateConverterResultExpression(
@@ -254,16 +254,12 @@ abstract class CodeGenerator(
val input1AccessExprs = input1Mapping.map {
case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+ TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER if rowtimeExpression.isDefined =>
+ // generate rowtime attribute from expression
+ generateExpression(rowtimeExpression.get)
+ case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
- // attribute is a rowtime indicator.
- rowtimeExpression match {
- case Some(expr) =>
- // generate rowtime attribute from expression
- generateExpression(expr)
- case _ =>
- // fetch rowtime attribute from StreamRecord timestamp field
- generateRowtimeAccess()
- }
+ throw TableException("Rowtime extraction expression missing. Please report a bug.")
case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
// attribute is proctime indicator.
// we use a null literal and generate a timestamp when we need it.
@@ -845,7 +841,7 @@ abstract class CodeGenerator(
val operand = operands.head
requireTimeInterval(operand)
generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
-
+
// comparison
case EQUALS =>
val left = operands.head
@@ -994,6 +990,9 @@ abstract class CodeGenerator(
case ScalarSqlFunctions.CONCAT_WS =>
generateConcatWs(operands)
+ case StreamRecordTimestampSqlFunction =>
+ generateStreamRecordRowtimeAccess()
+
// advanced scalar functions
case sqlOperator: SqlOperator =>
val callGen = FunctionGenerator.getCallGenerator(
@@ -1298,7 +1297,7 @@ abstract class CodeGenerator(
}
}
- private[flink] def generateRowtimeAccess(): GeneratedExpression = {
+ private[flink] def generateStreamRecordRowtimeAccess(): GeneratedExpression = {
val resultTerm = newName("result")
val nullTerm = newName("isNull")
@@ -1313,7 +1312,7 @@ abstract class CodeGenerator(
|boolean $nullTerm = false;
""".stripMargin
- GeneratedExpression(resultTerm, nullTerm, accessCode, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+ GeneratedExpression(resultTerm, nullTerm, accessCode, Types.LONG)
}
private[flink] def generateProctimeTimestamp(): GeneratedExpression = {
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index bad5889..f3ef039 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.functions.sql.StreamRecordTimestampSqlFunction
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -163,9 +164,9 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
override private[flink] def validateInput(): ValidationResult = {
child match {
- case WindowReference(_, Some(tpe)) if isProctimeIndicatorType(tpe) =>
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isProctimeIndicatorType(tpe) =>
ValidationFailure("A proctime window cannot provide a rowtime attribute.")
- case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
// rowtime window
ValidationSuccess
case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
@@ -181,7 +182,7 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
override def resultType: TypeInformation[_] = {
child match {
- case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
// rowtime window
TimeIndicatorTypeInfo.ROWTIME_INDICATOR
case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
@@ -203,7 +204,7 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
override private[flink] def validateInput(): ValidationResult = {
child match {
- case WindowReference(_, Some(tpe)) if isTimeIndicatorType(tpe) =>
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isTimeIndicatorType(tpe) =>
ValidationSuccess
case WindowReference(_, _) =>
ValidationFailure("Reference to a rowtime or proctime window required.")
@@ -221,3 +222,13 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
override def toString: String = s"proctime($child)"
}
+
+/** Expression to access the timestamp of a StreamRecord. */
+case class StreamRecordTimestamp() extends LeafExpression {
+
+ override private[flink] def resultType = Types.LONG
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.getRexBuilder.makeCall(StreamRecordTimestampSqlFunction)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
new file mode 100644
index 0000000..10be75c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/StreamRecordTimestampSqlFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.sql
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+
+/**
+ * Function to access the timestamp of a StreamRecord.
+ */
+object StreamRecordTimestampSqlFunction extends SqlFunction(
+ "STREAMRECORD_TIMESTAMP",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.BIGINT),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.family(SqlTypeFamily.NUMERIC),
+ SqlFunctionCategory.SYSTEM) {
+
+ override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+ override def isDeterministic: Boolean = true
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 59b3120..f4bd3b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -22,11 +22,14 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.RexNode
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.expressions.Cast
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.schema.DataStreamTable
import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
/**
* Flink RelNode which matches along with DataStreamSource.
@@ -61,7 +64,22 @@ class DataStreamScan(
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
val fieldIdxs = dataStreamTable.fieldIndexes
- convertToInternalRow(schema, inputDataStream, fieldIdxs, config, None)
+
+ // get expression to extract timestamp
+ val rowtimeExpr: Option[RexNode] =
+ if (fieldIdxs.contains(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER)) {
+ // extract timestamp from StreamRecord
+ Some(
+ Cast(
+ org.apache.flink.table.expressions.StreamRecordTimestamp(),
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+ .toRexNode(tableEnv.getRelBuilder))
+ } else {
+ None
+ }
+
+ // convert DataStream
+ convertToInternalRow(schema, inputDataStream, fieldIdxs, config, rowtimeExpr)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 46acd6c..5d305b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sources._
+import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
index 6a2ccc9..fcfc2ca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
@@ -22,6 +22,7 @@ import java.util.{Map => JMap}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
/**
* The [[DefinedFieldMapping]] interface provides a mapping for the fields of the table schema
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
index bfc06f9..f09baa3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -22,6 +22,8 @@ import java.util
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy
/**
* Extends a [[TableSource]] to specify a processing time attribute.
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
deleted file mode 100644
index e0f01d5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
-
-/**
- * Provides the an expression to extract the timestamp for a rowtime attribute.
- */
-abstract class TimestampExtractor extends FieldComputer[Long] {
-
- /** Timestamp extractors compute the timestamp as Long. */
- override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
-}
-
-/**
- * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
- *
- * @param field The field to convert into a rowtime attribute.
- */
-class ExistingField(field: String) extends TimestampExtractor {
-
- override def getArgumentFields: Array[String] = Array(field)
-
- @throws[ValidationException]
- override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
-
- // get type of field to convert
- val fieldType = physicalFieldTypes(0)
-
- // check that the field to convert is of type Long or Timestamp
- fieldType match {
- case Types.LONG => // OK
- case Types.SQL_TIMESTAMP => // OK
- case _: TypeInformation[_] =>
- throw ValidationException(
- s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
- }
- }
-
- /**
- * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
- * rowtime attribute.
- */
- def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-
- val fieldAccess: Expression = fieldAccesses(0)
-
- fieldAccess.resultType match {
- case Types.LONG =>
- // access LONG field
- fieldAccess
- case Types.SQL_TIMESTAMP =>
- // cast timestamp to long
- Cast(fieldAccess, Types.LONG)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
new file mode 100644
index 0000000..12cd564
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
+
+/**
+ * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
+ *
+ * @param field The field to convert into a rowtime attribute.
+ */
+class ExistingField(field: String) extends TimestampExtractor {
+
+ override def getArgumentFields: Array[String] = Array(field)
+
+ @throws[ValidationException]
+ override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
+
+ // get type of field to convert
+ val fieldType = physicalFieldTypes(0)
+
+ // check that the field to convert is of type Long or Timestamp
+ fieldType match {
+ case Types.LONG => // OK
+ case Types.SQL_TIMESTAMP => // OK
+ case _: TypeInformation[_] =>
+ throw ValidationException(
+ s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
+ }
+ }
+
+ /**
+ * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
+ * rowtime attribute.
+ */
+ def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+
+ val fieldAccess: Expression = fieldAccesses(0)
+
+ fieldAccess.resultType match {
+ case Types.LONG =>
+ // access LONG field
+ fieldAccess
+ case Types.SQL_TIMESTAMP =>
+ // cast timestamp to long
+ Cast(fieldAccess, Types.LONG)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
new file mode 100644
index 0000000..329f790
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference}
+
+/**
+ * Extracts the timestamp of a StreamRecord into a rowtime attribute.
+ *
+ * Note: This extractor only works for StreamTableSources.
+ */
+class StreamRecordTimestamp extends TimestampExtractor {
+
+ /** No argument fields required. */
+ override def getArgumentFields: Array[String] = Array()
+
+ /** No validation required. */
+ @throws[ValidationException]
+ override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = { }
+
+ /**
+ * Returns an [[Expression]] that extracts the timestamp of a StreamRecord.
+ */
+ override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+ org.apache.flink.table.expressions.StreamRecordTimestamp()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
new file mode 100644
index 0000000..34c6ba5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.FieldComputer
+
+/**
+ * Provides the an expression to extract the timestamp for a rowtime attribute.
+ */
+abstract class TimestampExtractor extends FieldComputer[Long] {
+
+ /** Timestamp extractors compute the timestamp as Long. */
+ override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
deleted file mode 100644
index eec423f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.types.Row
-
-/**
- * Provides a strategy to generate watermarks for a rowtime attribute.
- *
- * A watermark strategy is either a [[PeriodicWatermarkAssigner]] or
- * [[PunctuatedWatermarkAssigner]].
- *
- */
-sealed abstract class WatermarkStrategy extends Serializable
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Updates the assigner with the next timestamp.
- *
- * @param timestamp The next timestamp to update the assigner.
- */
- def nextTimestamp(timestamp: Long): Unit
-
- /**
- * Returns the current watermark.
- *
- * @return The current watermark.
- */
- def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Returns the watermark for the current row or null if no watermark should be generated.
- *
- * @param row The current row.
- * @param timestamp The value of the timestamp attribute for the row.
- * @return The watermark for this row or null if no watermark should be generated.
- */
- def getWatermark(row: Row, timestamp: Long): Watermark
-}
-
-/**
- * A watermark assigner for ascending rowtime attributes.
- *
- * Emits a watermark of the maximum observed timestamp so far minus 1.
- * Rows that have a timestamp equal to the max timestamp are not late.
- */
-class AscendingWatermarks extends PeriodicWatermarkAssigner {
-
- var maxTimestamp: Long = Long.MinValue + 1
-
- override def nextTimestamp(timestamp: Long): Unit = {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp
- }
- }
-
- override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-}
-
-/**
- * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
- *
- * Emits watermarks which are the maximum observed timestamp minus the specified delay.
- *
- * @param delay The delay by which watermarks are behind the maximum observed timestamp.
- */
-class BoundedOutOfOrderWatermarks(val delay: Long) extends PeriodicWatermarkAssigner {
-
- var maxTimestamp: Long = Long.MinValue + delay
-
- override def nextTimestamp(timestamp: Long): Unit = {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp
- }
- }
-
- override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
new file mode 100644
index 0000000..3a947ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+ * A watermark assigner for ascending rowtime attributes.
+ *
+ * Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+ var maxTimestamp: Long = Long.MinValue + 1
+
+ override def nextTimestamp(timestamp: Long): Unit = {
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp
+ }
+ }
+
+ override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0e92b663/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
new file mode 100644
index 0000000..957daca
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies
+
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+ * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
+ *
+ * Emits watermarks which are the maximum observed timestamp minus the specified delay.
+ *
+ * @param delay The delay by which watermarks are behind the maximum observed timestamp.
+ */
+class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
+
+ var maxTimestamp: Long = Long.MinValue + delay
+
+ override def nextTimestamp(timestamp: Long): Unit = {
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp
+ }
+ }
+
+ override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
+}