You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2019/07/12 10:53:01 UTC

[flink] branch release-1.9 updated: [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 9d7045c  [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings
9d7045c is described below

commit 9d7045c47b45814fa0b734bf3784df06dd70a490
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Sun Jul 7 14:18:46 2019 +0200

    [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings
---
 docs/dev/connectors/pubsub.md | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index 0ee8187..59b3e00 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -94,7 +94,7 @@ DataStream<SomeObject> dataStream = (...);
 
 SerializationSchema<SomeObject> serializationSchema = (...);
 SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
-                                                .withDeserializationSchema(deserializer)
+                                                .withSerializationSchema(serializationSchema)
                                                 .withProjectName("project")
                                                 .withSubscriptionName("subscription")
                                                 .build()
@@ -120,18 +120,20 @@ The following example shows how you would create a source to read messages from
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+String hostAndPort = "localhost:1234";
 DeserializationSchema<SomeObject> deserializationSchema = (...);
 SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       .withDeserializationSchema(deserializationSchema)
                                                       .withProjectName("my-fake-project")
                                                       .withSubscriptionName("subscription")
-                                                      .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
+                                                      .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100))
                                                       .build();
+SerializationSchema<SomeObject> serializationSchema = (...);
 SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
-                                                .withDeserializationSchema(deserializationSchema)
+                                                .withSerializationSchema(serializationSchema)
                                                 .withProjectName("my-fake-project")
                                                 .withSubscriptionName("subscription")
-                                                .withHostAndPortForEmulator(getPubSubHostPort())
+                                                .withHostAndPortForEmulator(hostAndPort)
                                                 .build()
 
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();