You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/23 14:33:30 UTC

[1/2] flink git commit: [FLINK-8118] [table] Improve KafkaTableSource documentation

Repository: flink
Updated Branches:
  refs/heads/release-1.4 828ef09b0 -> 13631b961


[FLINK-8118] [table] Improve KafkaTableSource documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13631b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13631b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13631b96

Branch: refs/heads/release-1.4
Commit: 13631b9617d32e46eba51c9125019ec5e77c39f3
Parents: 2fb2458
Author: twalthr <tw...@apache.org>
Authored: Thu Nov 23 14:30:15 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 23 15:34:22 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   | 56 ++++++++++----------
 .../kafka/KafkaTableSourceTestBase.java         |  5 +-
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/13631b96/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index aaf23bc..2b10278 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a builder. The followin
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // create builder
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -108,7 +108,7 @@ Map<String, String> mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -150,7 +150,7 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -174,20 +174,20 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 </div>
@@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a builder. The followin
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // create builder
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder()
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -256,7 +256,7 @@ Map<String, String> mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -264,15 +264,15 @@ TableSource source = Kafka010AvroTableSource.builder()
     .field("temperature", Types.DOUBLE()).build())
   // set class of Avro record with fields [id, temp]
   .forAvroRecordClass(SensorReading.class)
-  // set mapping from table fields to JSON fields
-  .withTableToJsonMapping(mapping)
+  // set mapping from table fields to Avro fields
+  .withTableToAvroMapping(mapping)
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010AvroTableSource.builder()
+val source: KafkaTableSource = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -280,8 +280,8 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder()
     .field("temperature", Types.DOUBLE).build())
   // set class of Avro record with fields [id, temp]
   .forAvroRecordClass(classOf[SensorReading])
-  // set mapping from table fields to JSON fields
-  .withTableToJsonMapping(Map(
+  // set mapping from table fields to Avro fields
+  .withTableToAvroMapping(Map(
     "sensorId" -> "id", 
     "temperature" -> "temp").asJava)
   .build()
@@ -294,20 +294,20 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder()
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010AvroTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 </div>
@@ -326,7 +326,7 @@ A table schema field of type `SQL_TIMESTAMP` can be declared as a processing tim
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ... 
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())  
@@ -341,7 +341,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)
@@ -372,7 +372,7 @@ The following example shows how to configure a rowtime attribute.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())
@@ -392,7 +392,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)
@@ -418,7 +418,7 @@ Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies whe
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG())
@@ -437,7 +437,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   .withSchema(TableSchema.builder()
     .field("sensorId", Types.LONG)

http://git-wip-us.apache.org/repos/asf/flink/blob/13631b96/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 64dac06..688fd73 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -159,7 +159,7 @@ public abstract class KafkaTableSourceTestBase {
 	}
 
 	@Test
-	public void testKafkaTSRowtimeAttribute() {
+	public void testRowtimeAttribute2() {
 		KafkaTableSource.Builder b = getBuilder();
 		configureBuilder(b);
 
@@ -191,7 +191,8 @@ public abstract class KafkaTableSourceTestBase {
 	}
 
 	@Test
-	public void testKafkaTSSetConsumeOffsets() {
+	@SuppressWarnings("unchecked")
+	public void testConsumerOffsets() {
 		KafkaTableSource.Builder b = getBuilder();
 		configureBuilder(b);
 


[2/2] flink git commit: [FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

Posted by fh...@apache.org.
[FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

This closes #5056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb24581
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb24581
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb24581

Branch: refs/heads/release-1.4
Commit: 2fb24581a1775084e3be8c2575c129d250f39313
Parents: 828ef09
Author: Xingcan Cui <xi...@gmail.com>
Authored: Thu Nov 23 00:00:39 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 23 15:34:22 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  50 +++++++-
 .../kafka/Kafka010AvroTableSource.java          |   2 +-
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka011AvroTableSource.java          |   2 +-
 .../kafka/Kafka011JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/Kafka08AvroTableSource.java           |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka09AvroTableSource.java           |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSource.java      | 126 ++++++++++++++++++-
 .../kafka/KafkaTableSourceTestBase.java         |  44 +++++++
 17 files changed, 230 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7387358..aaf23bc 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -145,7 +145,7 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 </div>
 </div>
 
-* **Missing Field Handling** By default, a missing JSON field is set to `null`. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.
+* **Missing Field Handling:** By default, a missing JSON field is set to `null`. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -169,6 +169,30 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions via the builder's methods, which correspond to the configurations in section [Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### KafkaAvroTableSource
@@ -265,6 +289,30 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions via the builder's methods, which correspond to the configurations in section [Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### Configuring a Processing Time Attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index fbc58ea..660162a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bbdb32f..5f9984e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index bc675eb..379c562 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka010TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index af3b5af..a9a109c 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 71158f6..cee7c61 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index dbf980b..8c40318 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka011TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 8f45881..9105c73 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index b3b37c6..639093d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8270b78..3bb6a94 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka08TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index 808be01..fb8496a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index a699d65..ded23b0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 1d2c028..df15452 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka09TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 8cea36c..055b679 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -44,7 +44,7 @@ import java.util.Properties;
  * A version-agnostic Kafka Avro {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
 public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 9a6525c..6806673 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -32,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka JSON {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  *
  * <p>The field names are used to parse the JSON file and so are the types.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 3291f7d..d0ee7de 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
@@ -37,6 +39,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import scala.Option;
@@ -45,7 +48,7 @@ import scala.Option;
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
 public abstract class KafkaTableSource
 	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
@@ -68,6 +71,12 @@ public abstract class KafkaTableSource
 	/** Descriptor for a rowtime attribute. */
 	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
 
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	private StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
 	/**
 	 * Creates a generic Kafka {@link StreamTableSource}.
 	 *
@@ -121,6 +130,37 @@ public abstract class KafkaTableSource
 		return rowtimeAttributeDescriptors;
 	}
 
+	/**
+	 * Returns a version-specific Kafka consumer with the start position configured.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema) {
+		FlinkKafkaConsumerBase<Row> kafkaConsumer =
+				createKafkaConsumer(topic, properties, deserializationSchema);
+		switch (startupMode) {
+			case EARLIEST:
+				kafkaConsumer.setStartFromEarliest();
+				break;
+			case LATEST:
+				kafkaConsumer.setStartFromLatest();
+				break;
+			case GROUP_OFFSETS:
+				kafkaConsumer.setStartFromGroupOffsets();
+				break;
+			case SPECIFIC_OFFSETS:
+				kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+				break;
+		}
+		return kafkaConsumer;
+	}
+
 	//////// SETTERS FOR OPTIONAL PARAMETERS
 
 	/**
@@ -160,17 +200,35 @@ public abstract class KafkaTableSource
 		this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
 	}
 
+	/**
+	 * Sets the startup mode of the TableSource.
+	 *
+	 * @param startupMode The startup mode.
+	 */
+	protected void setStartupMode(StartupMode startupMode) {
+		this.startupMode = startupMode;
+	}
+
+	/**
+	 * Sets the startup offsets of the TableSource; only relevant when the startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 *
+	 * @param specificStartupOffsets The startup offsets for different partitions.
+	 */
+	protected void setSpecificStartupOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+		this.specificStartupOffsets = specificStartupOffsets;
+	}
+
 	//////// ABSTRACT METHODS FOR SUBCLASSES
 
 	/**
-	 * Returns the version-specific Kafka consumer.
+	 * Creates a version-specific Kafka consumer.
 	 *
 	 * @param topic                 Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param deserializationSchema Deserialization schema to use for Kafka records.
 	 * @return The version-specific Kafka consumer
 	 */
-	abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+	protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema);
@@ -201,6 +259,13 @@ public abstract class KafkaTableSource
 
 		private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
 
+		/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+		private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+		/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+		private Map<KafkaTopicPartition, Long> specificStartupOffsets = null;
+
+
 		/**
 		 * Sets the topic from which the table is read.
 		 *
@@ -310,6 +375,51 @@ public abstract class KafkaTableSource
 		}
 
 		/**
+		 * Configures the TableSource to start reading from the earliest offset for all partitions.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromEarliest()
+		 */
+		public B fromEarliest() {
+			this.startupMode = StartupMode.EARLIEST;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading from the latest offset for all partitions.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromLatest()
+		 */
+		public B fromLatest() {
+			this.startupMode = StartupMode.LATEST;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+		 */
+		public B fromGroupOffsets() {
+			this.startupMode = StartupMode.GROUP_OFFSETS;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading partitions from specific offsets, set independently for each partition.
+		 *
+		 * @param specificStartupOffsets the specified offsets for partitions
+		 * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+		 */
+		public B fromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+			this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+			this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
+			return builder();
+		}
+
+		/**
 		 * Returns the configured topic.
 		 *
 		 * @return the configured topic.
@@ -357,6 +467,16 @@ public abstract class KafkaTableSource
 			} else {
 				tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 			}
+			tableSource.setStartupMode(startupMode);
+			switch (startupMode) {
+				case EARLIEST:
+				case LATEST:
+				case GROUP_OFFSETS:
+					break;
+				case SPECIFIC_OFFSETS:
+					tableSource.setSpecificStartupOffsets(specificStartupOffsets);
+					break;
+			}
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 7a882f4..64dac06 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.Row;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -44,6 +45,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Abstract test base for all Kafka table sources.
@@ -188,6 +190,48 @@ public abstract class KafkaTableSourceTestBase {
 		}
 	}
 
+	@Test
+	public void testKafkaTSSetConsumeOffsets() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		// test the default behavior
+		KafkaTableSource source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
+
+		// test reading from earliest
+		b.fromEarliest();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromEarliest();
+
+		// test reading from latest
+		b.fromLatest();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromLatest();
+
+		// test reading from group offsets
+		b.fromGroupOffsets();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
+
+		// test reading from given offsets
+		b.fromSpecificOffsets(mock(Map.class));
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class));
+	}
+
 	protected abstract KafkaTableSource.Builder getBuilder();
 
 	protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();