You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/12 10:51:52 UTC
[flink] branch master updated: [FLINK-13133] [pubsub] Fix small
error in PubSub documentation relating to PubSubSink serializer and
emulator settings
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3059a2f [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings
3059a2f is described below
commit 3059a2f6420724a764cfc934886ff08f65b34d68
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Sun Jul 7 14:18:46 2019 +0200
[FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings
---
docs/dev/connectors/pubsub.md | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index 0ee8187..59b3e00 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -94,7 +94,7 @@ DataStream<SomeObject> dataStream = (...);
SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
- .withDeserializationSchema(deserializer)
+ .withSerializationSchema(serializationSchema)
.withProjectName("project")
.withSubscriptionName("subscription")
.build()
@@ -120,18 +120,20 @@ The following example shows how you would create a source to read messages from
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
+String hostAndPort = "localhost:1234";
DeserializationSchema<SomeObject> deserializationSchema = (...);
SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(deserializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
- .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
+ .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
.build();
+SerializationSchema<SomeObject> serializationSchema = (...);
SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
- .withDeserializationSchema(deserializationSchema)
+ .withSerializationSchema(serializationSchema)
.withProjectName("my-fake-project")
.withSubscriptionName("subscription")
- .withHostAndPortForEmulator(getPubSubHostPort())
+ .withHostAndPortForEmulator(hostAndPort)
.build()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();