You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:03 UTC
[32/50] [abbrv] beam git commit: [BEAM-1722] Move PubsubIO into the
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
deleted file mode 100644
index 4a6ddac..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auth.Credentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.DeleteSubscriptionRequest;
-import com.google.pubsub.v1.DeleteTopicRequest;
-import com.google.pubsub.v1.GetSubscriptionRequest;
-import com.google.pubsub.v1.ListSubscriptionsRequest;
-import com.google.pubsub.v1.ListSubscriptionsResponse;
-import com.google.pubsub.v1.ListTopicsRequest;
-import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.PublishRequest;
-import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
-import com.google.pubsub.v1.PublisherGrpc.PublisherBlockingStub;
-import com.google.pubsub.v1.PubsubMessage;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
-import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub;
-import com.google.pubsub.v1.Subscription;
-import com.google.pubsub.v1.Topic;
-import io.grpc.Channel;
-import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
-import io.grpc.auth.ClientAuthInterceptor;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A helper class for talking to Pubsub via grpc.
- *
- * <p>CAUTION: Currently uses the application default credentials and does not respect any
- * credentials-related arguments in {@link GcpOptions}.
- */
-public class PubsubGrpcClient extends PubsubClient {
- private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
- private static final int PUBSUB_PORT = 443;
- private static final int LIST_BATCH_SIZE = 1000;
-
- private static final int DEFAULT_TIMEOUT_S = 15;
-
- private static class PubsubGrpcClientFactory implements PubsubClientFactory {
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- ManagedChannel channel = NettyChannelBuilder
- .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT)
- .negotiationType(NegotiationType.TLS)
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .build();
-
- return new PubsubGrpcClient(timestampLabel,
- idLabel,
- DEFAULT_TIMEOUT_S,
- channel,
- options.getGcpCredential());
- }
-
- @Override
- public String getKind() {
- return "Grpc";
- }
- }
-
- /**
- * Factory for creating Pubsub clients using gRCP transport.
- */
- public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory();
-
- /**
- * Timeout for grpc calls (in s).
- */
- private final int timeoutSec;
-
- /**
- * Underlying netty channel, or {@literal null} if closed.
- */
- @Nullable
- private ManagedChannel publisherChannel;
-
- /**
- * Credentials determined from options and environment.
- */
- private final Credentials credentials;
-
- /**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
- * instead.
- */
- @Nullable
- private final String timestampLabel;
-
- /**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
- */
- @Nullable
- private final String idLabel;
-
-
- /**
- * Cached stubs, or null if not cached.
- */
- @Nullable
- private PublisherGrpc.PublisherBlockingStub cachedPublisherStub;
- private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub;
-
- @VisibleForTesting
- PubsubGrpcClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
- int timeoutSec,
- ManagedChannel publisherChannel,
- Credentials credentials) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.timeoutSec = timeoutSec;
- this.publisherChannel = publisherChannel;
- this.credentials = credentials;
- }
-
- /**
- * Gracefully close the underlying netty channel.
- */
- @Override
- public void close() {
- if (publisherChannel == null) {
- // Already closed.
- return;
- }
- // Can gc the underlying stubs.
- cachedPublisherStub = null;
- cachedSubscriberStub = null;
- // Mark the client as having been closed before going further
- // in case we have an exception from the channel.
- ManagedChannel publisherChannel = this.publisherChannel;
- this.publisherChannel = null;
- // Gracefully shutdown the channel.
- publisherChannel.shutdown();
- try {
- publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // Ignore.
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Return channel with interceptor for returning credentials.
- */
- private Channel newChannel() throws IOException {
- checkState(publisherChannel != null, "PubsubGrpcClient has been closed");
- ClientAuthInterceptor interceptor =
- new ClientAuthInterceptor(credentials, Executors.newSingleThreadExecutor());
- return ClientInterceptors.intercept(publisherChannel, interceptor);
- }
-
- /**
- * Return a stub for making a publish request with a timeout.
- */
- private PublisherBlockingStub publisherStub() throws IOException {
- if (cachedPublisherStub == null) {
- cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
- }
- return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
- }
-
- /**
- * Return a stub for making a subscribe request with a timeout.
- */
- private SubscriberBlockingStub subscriberStub() throws IOException {
- if (cachedSubscriberStub == null) {
- cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
- }
- return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
- }
-
- @Override
- public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
- throws IOException {
- PublishRequest.Builder request = PublishRequest.newBuilder()
- .setTopic(topic.getPath());
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage.Builder message =
- PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
-
- if (outgoingMessage.attributes != null) {
- message.putAllAttributes(outgoingMessage.attributes);
- }
-
- if (timestampLabel != null) {
- message.getMutableAttributes()
- .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
- }
-
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
- message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
- }
-
- request.addMessages(message);
- }
-
- PublishResponse response = publisherStub().publish(request.build());
- return response.getMessageIdsCount();
- }
-
- @Override
- public List<IncomingMessage> pull(
- long requestTimeMsSinceEpoch,
- SubscriptionPath subscription,
- int batchSize,
- boolean returnImmediately) throws IOException {
- PullRequest request = PullRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .setReturnImmediately(returnImmediately)
- .setMaxMessages(batchSize)
- .build();
- PullResponse response = subscriberStub().pull(request);
- if (response.getReceivedMessagesCount() == 0) {
- return ImmutableList.of();
- }
- List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessagesCount());
- for (ReceivedMessage message : response.getReceivedMessagesList()) {
- PubsubMessage pubsubMessage = message.getMessage();
- @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
- // Payload.
- byte[] elementBytes = pubsubMessage.getData().toByteArray();
-
- // Timestamp.
- String pubsubTimestampString = null;
- Timestamp timestampProto = pubsubMessage.getPublishTime();
- if (timestampProto != null) {
- pubsubTimestampString = String.valueOf(timestampProto.getSeconds()
- + timestampProto.getNanos() / 1000L);
- }
- long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel, pubsubTimestampString, attributes);
-
- // Ack id.
- String ackId = message.getAckId();
- checkState(!Strings.isNullOrEmpty(ackId));
-
- // Record id, if any.
- @Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
- }
- if (Strings.isNullOrEmpty(recordId)) {
- // Fall back to the Pubsub provided message id.
- recordId = pubsubMessage.getMessageId();
- }
-
- incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
- requestTimeMsSinceEpoch, ackId, recordId));
- }
- return incomingMessages;
- }
-
- @Override
- public void acknowledge(SubscriptionPath subscription, List<String> ackIds)
- throws IOException {
- AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .build();
- subscriberStub().acknowledge(request); // ignore Empty result.
- }
-
- @Override
- public void modifyAckDeadline(
- SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
- throws IOException {
- ModifyAckDeadlineRequest request =
- ModifyAckDeadlineRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .addAllAckIds(ackIds)
- .setAckDeadlineSeconds(deadlineSeconds)
- .build();
- subscriberStub().modifyAckDeadline(request); // ignore Empty result.
- }
-
- @Override
- public void createTopic(TopicPath topic) throws IOException {
- Topic request = Topic.newBuilder()
- .setName(topic.getPath())
- .build();
- publisherStub().createTopic(request); // ignore Topic result.
- }
-
- @Override
- public void deleteTopic(TopicPath topic) throws IOException {
- DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
- .setTopic(topic.getPath())
- .build();
- publisherStub().deleteTopic(request); // ignore Empty result.
- }
-
- @Override
- public List<TopicPath> listTopics(ProjectPath project) throws IOException {
- ListTopicsRequest.Builder request =
- ListTopicsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListTopicsResponse response = publisherStub().listTopics(request.build());
- if (response.getTopicsCount() == 0) {
- return ImmutableList.of();
- }
- List<TopicPath> topics = new ArrayList<>(response.getTopicsCount());
- while (true) {
- for (Topic topic : response.getTopicsList()) {
- topics.add(topicPathFromPath(topic.getName()));
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = publisherStub().listTopics(request.build());
- }
- return topics;
- }
-
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException {
- Subscription request = Subscription.newBuilder()
- .setTopic(topic.getPath())
- .setName(subscription.getPath())
- .setAckDeadlineSeconds(ackDeadlineSeconds)
- .build();
- subscriberStub().createSubscription(request); // ignore Subscription result.
- }
-
- @Override
- public void deleteSubscription(SubscriptionPath subscription) throws IOException {
- DeleteSubscriptionRequest request =
- DeleteSubscriptionRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .build();
- subscriberStub().deleteSubscription(request); // ignore Empty result.
- }
-
- @Override
- public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException {
- ListSubscriptionsRequest.Builder request =
- ListSubscriptionsRequest.newBuilder()
- .setProject(project.getPath())
- .setPageSize(LIST_BATCH_SIZE);
- ListSubscriptionsResponse response = subscriberStub().listSubscriptions(request.build());
- if (response.getSubscriptionsCount() == 0) {
- return ImmutableList.of();
- }
- List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptionsCount());
- while (true) {
- for (Subscription subscription : response.getSubscriptionsList()) {
- if (subscription.getTopic().equals(topic.getPath())) {
- subscriptions.add(subscriptionPathFromPath(subscription.getName()));
- }
- }
- if (response.getNextPageToken().isEmpty()) {
- break;
- }
- request.setPageToken(response.getNextPageToken());
- response = subscriberStub().listSubscriptions(request.build());
- }
- return subscriptions;
- }
-
- @Override
- public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
- GetSubscriptionRequest request =
- GetSubscriptionRequest.newBuilder()
- .setSubscription(subscription.getPath())
- .build();
- Subscription response = subscriberStub().getSubscription(request);
- return response.getAckDeadlineSeconds();
- }
-
- @Override
- public boolean isEOF() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
deleted file mode 100644
index ef8abfd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.Pubsub.Builder;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
-import com.google.api.services.pubsub.model.ListTopicsResponse;
-import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A Pubsub client using JSON transport.
- */
-public class PubsubJsonClient extends PubsubClient {
-
- private static class PubsubJsonClientFactory implements PubsubClientFactory {
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credentials credential, HttpRequestInitializer httpRequestInitializer) {
- if (credential == null) {
- return httpRequestInitializer;
- } else {
- return new ChainingHttpRequestInitializer(
- new HttpCredentialsAdapter(credential),
- httpRequestInitializer);
- }
- }
-
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- Pubsub pubsub = new Builder(
- Transport.getTransport(),
- Transport.getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setRootUrl(options.getPubsubRootUrl())
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
- .build();
- return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
- }
-
- @Override
- public String getKind() {
- return "Json";
- }
- }
-
- /**
- * Factory for creating Pubsub clients using Json transport.
- */
- public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
-
- /**
- * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
- * instead.
- */
- @Nullable
- private final String timestampLabel;
-
- /**
- * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
- */
- @Nullable
- private final String idLabel;
-
- /**
- * Underlying JSON transport.
- */
- private Pubsub pubsub;
-
- @VisibleForTesting
- PubsubJsonClient(
- @Nullable String timestampLabel,
- @Nullable String idLabel,
- Pubsub pubsub) {
- this.timestampLabel = timestampLabel;
- this.idLabel = idLabel;
- this.pubsub = pubsub;
- }
-
- @Override
- public void close() {
- // Nothing to close.
- }
-
- @Override
- public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
- throws IOException {
- List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
- Map<String, String> attributes = outgoingMessage.attributes;
- if ((timestampLabel != null || idLabel != null) && attributes == null) {
- attributes = new TreeMap<>();
- }
- if (attributes != null) {
- pubsubMessage.setAttributes(attributes);
- }
-
- if (timestampLabel != null) {
- attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
- }
-
- if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
- attributes.put(idLabel, outgoingMessage.recordId);
- }
-
- pubsubMessages.add(pubsubMessage);
- }
- PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
- PublishResponse response = pubsub.projects()
- .topics()
- .publish(topic.getPath(), request)
- .execute();
- return response.getMessageIds().size();
- }
-
- @Override
- public List<IncomingMessage> pull(
- long requestTimeMsSinceEpoch,
- SubscriptionPath subscription,
- int batchSize,
- boolean returnImmediately) throws IOException {
- PullRequest request = new PullRequest()
- .setReturnImmediately(returnImmediately)
- .setMaxMessages(batchSize);
- PullResponse response = pubsub.projects()
- .subscriptions()
- .pull(subscription.getPath(), request)
- .execute();
- if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
- return ImmutableList.of();
- }
- List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
- for (ReceivedMessage message : response.getReceivedMessages()) {
- PubsubMessage pubsubMessage = message.getMessage();
- @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
- // Payload.
- byte[] elementBytes = pubsubMessage.decodeData();
-
- // Timestamp.
- long timestampMsSinceEpoch =
- extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
-
- // Ack id.
- String ackId = message.getAckId();
- checkState(!Strings.isNullOrEmpty(ackId));
-
- // Record id, if any.
- @Nullable String recordId = null;
- if (idLabel != null && attributes != null) {
- recordId = attributes.get(idLabel);
- }
- if (Strings.isNullOrEmpty(recordId)) {
- // Fall back to the Pubsub provided message id.
- recordId = pubsubMessage.getMessageId();
- }
-
- incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
- requestTimeMsSinceEpoch, ackId, recordId));
- }
-
- return incomingMessages;
- }
-
- @Override
- public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
- AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
- pubsub.projects()
- .subscriptions()
- .acknowledge(subscription.getPath(), request)
- .execute(); // ignore Empty result.
- }
-
- @Override
- public void modifyAckDeadline(
- SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
- throws IOException {
- ModifyAckDeadlineRequest request =
- new ModifyAckDeadlineRequest().setAckIds(ackIds)
- .setAckDeadlineSeconds(deadlineSeconds);
- pubsub.projects()
- .subscriptions()
- .modifyAckDeadline(subscription.getPath(), request)
- .execute(); // ignore Empty result.
- }
-
- @Override
- public void createTopic(TopicPath topic) throws IOException {
- pubsub.projects()
- .topics()
- .create(topic.getPath(), new Topic())
- .execute(); // ignore Topic result.
- }
-
- @Override
- public void deleteTopic(TopicPath topic) throws IOException {
- pubsub.projects()
- .topics()
- .delete(topic.getPath())
- .execute(); // ignore Empty result.
- }
-
- @Override
- public List<TopicPath> listTopics(ProjectPath project) throws IOException {
- ListTopicsResponse response = pubsub.projects()
- .topics()
- .list(project.getPath())
- .execute();
- if (response.getTopics() == null || response.getTopics().isEmpty()) {
- return ImmutableList.of();
- }
- List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
- for (Topic topic : response.getTopics()) {
- topics.add(topicPathFromPath(topic.getName()));
- }
- return topics;
- }
-
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription,
- int ackDeadlineSeconds) throws IOException {
- Subscription request = new Subscription()
- .setTopic(topic.getPath())
- .setAckDeadlineSeconds(ackDeadlineSeconds);
- pubsub.projects()
- .subscriptions()
- .create(subscription.getPath(), request)
- .execute(); // ignore Subscription result.
- }
-
- @Override
- public void deleteSubscription(SubscriptionPath subscription) throws IOException {
- pubsub.projects()
- .subscriptions()
- .delete(subscription.getPath())
- .execute(); // ignore Empty result.
- }
-
- @Override
- public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
- throws IOException {
- ListSubscriptionsResponse response = pubsub.projects()
- .subscriptions()
- .list(project.getPath())
- .execute();
- if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
- return ImmutableList.of();
- }
- List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
- for (Subscription subscription : response.getSubscriptions()) {
- if (subscription.getTopic().equals(topic.getPath())) {
- subscriptions.add(subscriptionPathFromPath(subscription.getName()));
- }
- }
- return subscriptions;
- }
-
- @Override
- public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
- Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
- return response.getAckDeadlineSeconds();
- }
-
- @Override
- public boolean isEOF() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
deleted file mode 100644
index 61479f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
- * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
- * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
- */
-public class PubsubTestClient extends PubsubClient implements Serializable {
- /**
- * Mimic the state of the simulated Pubsub 'service'.
- *
- * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
- * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
- * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
- * following data structures over all clients and must resort to a static.
- */
- private static class State {
- /**
- * True if has been primed for a test but not yet validated.
- */
- boolean isActive;
-
- /**
- * Publish mode only: Only publish calls for this topic are allowed.
- */
- @Nullable
- TopicPath expectedTopic;
-
- /**
- * Publish mode only: Messages yet to seen in a {@link #publish} call.
- */
- @Nullable
- Set<OutgoingMessage> remainingExpectedOutgoingMessages;
-
- /**
- * Publish mode only: Messages which should throw when first sent to simulate transient publish
- * failure.
- */
- @Nullable
- Set<OutgoingMessage> remainingFailingOutgoingMessages;
-
- /**
- * Pull mode only: Clock from which to get current time.
- */
- @Nullable
- Clock clock;
-
- /**
- * Pull mode only: Only pull calls for this subscription are allowed.
- */
- @Nullable
- SubscriptionPath expectedSubscription;
-
- /**
- * Pull mode only: Timeout to simulate.
- */
- int ackTimeoutSec;
-
- /**
- * Pull mode only: Messages waiting to be received by a {@link #pull} call.
- */
- @Nullable
- List<IncomingMessage> remainingPendingIncomingMessages;
-
- /**
- * Pull mode only: Messages which have been returned from a {@link #pull} call and
- * not yet ACKed by an {@link #acknowledge} call.
- */
- @Nullable
- Map<String, IncomingMessage> pendingAckIncomingMessages;
-
- /**
- * Pull mode only: When above messages are due to have their ACK deadlines expire.
- */
- @Nullable
- Map<String, Long> ackDeadline;
- }
-
- private static final State STATE = new State();
-
- /** Closing the factory will validate all expected messages were processed. */
- public interface PubsubTestClientFactory
- extends PubsubClientFactory, Closeable, Serializable {
- }
-
- /**
- * Return a factory for testing publishers. Only one factory may be in-flight at a time.
- * The factory must be closed when the test is complete, at which point final validation will
- * occur.
- */
- public static PubsubTestClientFactory createFactoryForPublish(
- final TopicPath expectedTopic,
- final Iterable<OutgoingMessage> expectedOutgoingMessages,
- final Iterable<OutgoingMessage> failingOutgoingMessages) {
- synchronized (STATE) {
- checkState(!STATE.isActive, "Test still in flight");
- STATE.expectedTopic = expectedTopic;
- STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
- STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
- STATE.isActive = true;
- }
- return new PubsubTestClientFactory() {
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- return new PubsubTestClient();
- }
-
- @Override
- public String getKind() {
- return "PublishTest";
- }
-
- @Override
- public void close() {
- synchronized (STATE) {
- checkState(STATE.isActive, "No test still in flight");
- checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
- "Still waiting for %s messages to be published",
- STATE.remainingExpectedOutgoingMessages.size());
- STATE.isActive = false;
- STATE.remainingExpectedOutgoingMessages = null;
- }
- }
- };
- }
-
- /**
- * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
- * The factory must be closed when the test in complete
- */
- public static PubsubTestClientFactory createFactoryForPull(
- final Clock clock,
- final SubscriptionPath expectedSubscription,
- final int ackTimeoutSec,
- final Iterable<IncomingMessage> expectedIncomingMessages) {
- synchronized (STATE) {
- checkState(!STATE.isActive, "Test still in flight");
- STATE.clock = clock;
- STATE.expectedSubscription = expectedSubscription;
- STATE.ackTimeoutSec = ackTimeoutSec;
- STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
- STATE.pendingAckIncomingMessages = new HashMap<>();
- STATE.ackDeadline = new HashMap<>();
- STATE.isActive = true;
- }
- return new PubsubTestClientFactory() {
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- return new PubsubTestClient();
- }
-
- @Override
- public String getKind() {
- return "PullTest";
- }
-
- @Override
- public void close() {
- synchronized (STATE) {
- checkState(STATE.isActive, "No test still in flight");
- checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
- "Still waiting for %s messages to be pulled",
- STATE.remainingPendingIncomingMessages.size());
- checkState(STATE.pendingAckIncomingMessages.isEmpty(),
- "Still waiting for %s messages to be ACKed",
- STATE.pendingAckIncomingMessages.size());
- checkState(STATE.ackDeadline.isEmpty(),
- "Still waiting for %s messages to be ACKed",
- STATE.ackDeadline.size());
- STATE.isActive = false;
- STATE.remainingPendingIncomingMessages = null;
- STATE.pendingAckIncomingMessages = null;
- STATE.ackDeadline = null;
- }
- }
- };
- }
-
- public static PubsubTestClientFactory createFactoryForCreateSubscription() {
- return new PubsubTestClientFactory() {
- int numCalls = 0;
-
- @Override
- public void close() throws IOException {
- checkState(
- numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
- }
-
- @Override
- public PubsubClient newClient(
- @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
- throws IOException {
- return new PubsubTestClient() {
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
- throws IOException {
- checkState(numCalls == 0, "Expected at most one subscription to be created");
- numCalls++;
- }
- };
- }
-
- @Override
- public String getKind() {
- return "CreateSubscriptionTest";
- }
- };
- }
-
- /**
- * Return true if in pull mode.
- */
- private boolean inPullMode() {
- checkState(STATE.isActive, "No test is active");
- return STATE.expectedSubscription != null;
- }
-
- /**
- * Return true if in publish mode.
- */
- private boolean inPublishMode() {
- checkState(STATE.isActive, "No test is active");
- return STATE.expectedTopic != null;
- }
-
- /**
- * For subscription mode only:
- * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
- * expiring
- * outstanding ACKs.
- */
- public void advance() {
- synchronized (STATE) {
- checkState(inPullMode(), "Can only advance in pull mode");
- // Any messages who's ACKs timed out are available for re-pulling.
- Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
- while (deadlineItr.hasNext()) {
- Map.Entry<String, Long> entry = deadlineItr.next();
- if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
- STATE.remainingPendingIncomingMessages.add(
- STATE.pendingAckIncomingMessages.remove(entry.getKey()));
- deadlineItr.remove();
- }
- }
- }
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public int publish(
- TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
- synchronized (STATE) {
- checkState(inPublishMode(), "Can only publish in publish mode");
- checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
- STATE.expectedTopic);
- for (OutgoingMessage outgoingMessage : outgoingMessages) {
- if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
- throw new RuntimeException("Simulating failure for " + outgoingMessage);
- }
- checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
- "Unexpected outgoing message %s", outgoingMessage);
- }
- return outgoingMessages.size();
- }
- }
-
- @Override
- public List<IncomingMessage> pull(
- long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
- boolean returnImmediately) throws IOException {
- synchronized (STATE) {
- checkState(inPullMode(), "Can only pull in pull mode");
- long now = STATE.clock.currentTimeMillis();
- checkState(requestTimeMsSinceEpoch == now,
- "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
- checkState(subscription.equals(STATE.expectedSubscription),
- "Subscription %s does not match expected %s", subscription,
- STATE.expectedSubscription);
- checkState(returnImmediately, "Pull only supported if returning immediately");
-
- List<IncomingMessage> incomingMessages = new ArrayList<>();
- Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
- while (pendItr.hasNext()) {
- IncomingMessage incomingMessage = pendItr.next();
- pendItr.remove();
- IncomingMessage incomingMessageWithRequestTime =
- incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
- incomingMessages.add(incomingMessageWithRequestTime);
- STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
- incomingMessageWithRequestTime);
- STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
- requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
- if (incomingMessages.size() >= batchSize) {
- break;
- }
- }
- return incomingMessages;
- }
- }
-
- @Override
- public void acknowledge(
- SubscriptionPath subscription,
- List<String> ackIds) throws IOException {
- synchronized (STATE) {
- checkState(inPullMode(), "Can only acknowledge in pull mode");
- checkState(subscription.equals(STATE.expectedSubscription),
- "Subscription %s does not match expected %s", subscription,
- STATE.expectedSubscription);
-
- for (String ackId : ackIds) {
- checkState(STATE.ackDeadline.remove(ackId) != null,
- "No message with ACK id %s is waiting for an ACK", ackId);
- checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
- "No message with ACK id %s is waiting for an ACK", ackId);
- }
- }
- }
-
- @Override
- public void modifyAckDeadline(
- SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
- synchronized (STATE) {
- checkState(inPullMode(), "Can only modify ack deadline in pull mode");
- checkState(subscription.equals(STATE.expectedSubscription),
- "Subscription %s does not match expected %s", subscription,
- STATE.expectedSubscription);
-
- for (String ackId : ackIds) {
- if (deadlineSeconds > 0) {
- checkState(STATE.ackDeadline.remove(ackId) != null,
- "No message with ACK id %s is waiting for an ACK", ackId);
- checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
- "No message with ACK id %s is waiting for an ACK", ackId);
- STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
- } else {
- checkState(STATE.ackDeadline.remove(ackId) != null,
- "No message with ACK id %s is waiting for an ACK", ackId);
- IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
- checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
- STATE.remainingPendingIncomingMessages.add(message);
- }
- }
- }
- }
-
- @Override
- public void createTopic(TopicPath topic) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteTopic(TopicPath topic) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<TopicPath> listTopics(ProjectPath project) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void createSubscription(
- TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteSubscription(SubscriptionPath subscription) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<SubscriptionPath> listSubscriptions(
- ProjectPath project, TopicPath topic) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
- synchronized (STATE) {
- return STATE.ackTimeoutSec;
- }
- }
-
- @Override
- public boolean isEOF() {
- synchronized (STATE) {
- checkState(inPullMode(), "Can only check EOF in pull mode");
- return STATE.remainingPendingIncomingMessages.isEmpty();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 1edfa1d..80c093b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -107,8 +107,7 @@ public class Transport {
/**
* Returns a Pubsub client builder using the specified {@link PubsubOptions}.
*
- * @deprecated Use an appropriate
- * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory}
+ * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
*/
@Deprecated
public static Pubsub.Builder
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
deleted file mode 100644
index c996409..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import java.util.Set;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for PubsubIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class PubsubIOTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testPubsubIOGetName() {
- assertEquals("PubsubIO.Read",
- PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
- assertEquals("PubsubIO.Write",
- PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
- }
-
- @Test
- public void testTopicValidationSuccess() throws Exception {
- PubsubIO.<String>read().topic("projects/my-project/topics/abc");
- PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
- PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
- PubsubIO.<String>read().topic(new StringBuilder()
- .append("projects/my-project/topics/A-really-long-one-")
- .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
- .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
- .append("11111111111111111111111111111111111111111111111111111111111111111111111111")
- .toString());
- }
-
- @Test
- public void testTopicValidationBadCharacter() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
- }
-
- @Test
- public void testTopicValidationTooLong() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().topic(new StringBuilder().append
- ("projects/my-project/topics/A-really-long-one-")
- .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
- .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
- .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
- .toString());
- }
-
- @Test
- public void testReadTopicDisplayData() {
- String topic = "projects/project/topics/topic";
- String subscription = "projects/project/subscriptions/subscription";
- Duration maxReadTime = Duration.standardMinutes(5);
- PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
- }
-
- @Test
- public void testReadSubscriptionDisplayData() {
- String topic = "projects/project/topics/topic";
- String subscription = "projects/project/subscriptions/subscription";
- Duration maxReadTime = Duration.standardMinutes(5);
- PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription))
- .timestampLabel("myTimestamp")
- .idLabel("myId");
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("subscription", subscription));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
- }
-
- @Test
- public void testNullTopic() {
- String subscription = "projects/project/subscriptions/subscription";
- PubsubIO.Read<String> read = PubsubIO.<String>read()
- .subscription(StaticValueProvider.of(subscription));
- assertNull(read.getTopic());
- assertNotNull(read.getSubscription());
- assertNotNull(DisplayData.from(read));
- }
-
- @Test
- public void testNullSubscription() {
- String topic = "projects/project/topics/topic";
- PubsubIO.Read<String> read = PubsubIO.<String>read()
- .topic(StaticValueProvider.of(topic));
- assertNotNull(read.getTopic());
- assertNull(read.getSubscription());
- assertNotNull(DisplayData.from(read));
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesUnboundedPCollections.class})
- public void testPrimitiveReadDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- Set<DisplayData> displayData;
- PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
-
- // Reading from a subscription.
- read = read.subscription("projects/project/subscriptions/subscription");
- displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("PubsubIO.Read should include the subscription in its primitive display data",
- displayData, hasItem(hasDisplayItem("subscription")));
-
- // Reading from a topic.
- read = read.topic("projects/project/topics/topic");
- displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("PubsubIO.Read should include the topic in its primitive display data",
- displayData, hasItem(hasDisplayItem("topic")));
- }
-
- @Test
- public void testWriteDisplayData() {
- String topic = "projects/project/topics/topic";
- PubsubIO.Write<?> write = PubsubIO.<String>write()
- .topic(topic)
- .timestampLabel("myTimestamp")
- .idLabel("myId");
-
- DisplayData displayData = DisplayData.from(write);
-
- assertThat(displayData, hasDisplayItem("topic", topic));
- assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
- assertThat(displayData, hasDisplayItem("idLabel", "myId"));
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testPrimitiveWriteDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("PubsubIO.Write should include the topic in its primitive display data",
- displayData, hasItem(hasDisplayItem("topic")));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
deleted file mode 100644
index 7a4be62..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test PubsubUnboundedSink.
- */
-@RunWith(JUnit4.class)
-public class PubsubUnboundedSinkTest implements Serializable {
- private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
- private static final String DATA = "testData";
- private static final Map<String, String> ATTRIBUTES =
- ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build();
- private static final long TIMESTAMP = 1234L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
- private static final int NUM_SHARDS = 10;
-
- private static class Stamp extends DoFn<String, String> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
- }
- }
-
- private String getRecordId(String data) {
- return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
- }
-
- @Rule
- public transient TestPipeline p = TestPipeline.create();
-
- @Test
- public void saneCoder() throws Exception {
- OutgoingMessage message = new OutgoingMessage(
- DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA));
- CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
- CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void sendOneMessage() throws IOException {
- List<OutgoingMessage> outgoing =
- ImmutableList.of(new OutgoingMessage(
- DATA.getBytes(),
- ATTRIBUTES,
- TIMESTAMP, getRecordId(DATA)));
- int batchSize = 1;
- int batchBytes = 1;
- try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
- ImmutableList.<OutgoingMessage>of())) {
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
- Duration.standardSeconds(2),
- new SimpleFunction<String, PubsubIO.PubsubMessage>() {
- @Override
- public PubsubIO.PubsubMessage apply(String input) {
- return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES);
- }
- },
- RecordIdMethod.DETERMINISTIC);
- p.apply(Create.of(ImmutableList.of(DATA)))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- p.run();
- }
- // The PubsubTestClientFactory will assert fail on close if the actual published
- // message does not match the expected publish message.
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void sendMoreThanOneBatchByNumMessages() throws IOException {
- List<OutgoingMessage> outgoing = new ArrayList<>();
- List<String> data = new ArrayList<>();
- int batchSize = 2;
- int batchBytes = 1000;
- for (int i = 0; i < batchSize * 10; i++) {
- String str = String.valueOf(i);
- outgoing.add(new OutgoingMessage(
- str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
- data.add(str);
- }
- try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
- ImmutableList.<OutgoingMessage>of())) {
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
- TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
- Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC);
- p.apply(Create.of(data))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- p.run();
- }
- // The PubsubTestClientFactory will assert fail on close if the actual published
- // message does not match the expected publish message.
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void sendMoreThanOneBatchByByteSize() throws IOException {
- List<OutgoingMessage> outgoing = new ArrayList<>();
- List<String> data = new ArrayList<>();
- int batchSize = 100;
- int batchBytes = 10;
- int n = 0;
- while (n < batchBytes * 10) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < batchBytes; i++) {
- sb.append(String.valueOf(n));
- }
- String str = sb.toString();
- outgoing.add(new OutgoingMessage(
- str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str)));
- data.add(str);
- n += str.length();
- }
- try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
- ImmutableList.<OutgoingMessage>of())) {
- PubsubUnboundedSink<String> sink =
- new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
- null, RecordIdMethod.DETERMINISTIC);
- p.apply(Create.of(data))
- .apply(ParDo.of(new Stamp()))
- .apply(sink);
- p.run();
- }
- // The PubsubTestClientFactory will assert fail on close if the actual published
- // message does not match the expected publish message.
- }
-
- // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
- // (and random) record ids to be reused. However that can't be done without the test runnner
- // supporting retrying bundles.
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
deleted file mode 100644
index d9df2ca..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static junit.framework.TestCase.assertFalse;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
-import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubTestClient;
-import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
-import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test PubsubUnboundedSource.
- */
-@RunWith(JUnit4.class)
-public class PubsubUnboundedSourceTest {
- private static final SubscriptionPath SUBSCRIPTION =
- PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
- private static final String DATA = "testData";
- private static final long TIMESTAMP = 1234L;
- private static final long REQ_TIME = 6373L;
- private static final String TIMESTAMP_LABEL = "timestamp";
- private static final String ID_LABEL = "id";
- private static final String ACK_ID = "testAckId";
- private static final String RECORD_ID = "testRecordId";
- private static final int ACK_TIMEOUT_S = 60;
-
- private AtomicLong now;
- private Clock clock;
- private PubsubTestClientFactory factory;
- private PubsubSource<String> primSource;
-
- @Rule
- public TestPipeline p = TestPipeline.create();
-
- private void setupOneMessage(Iterable<IncomingMessage> incoming) {
- now = new AtomicLong(REQ_TIME);
- clock = new Clock() {
- @Override
- public long currentTimeMillis() {
- return now.get();
- }
- };
- factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
- clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
- StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null);
- primSource = new PubsubSource<>(source);
- }
-
- private void setupOneMessage() {
- setupOneMessage(ImmutableList.of(
- new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
- }
-
- @After
- public void after() throws IOException {
- factory.close();
- now = null;
- clock = null;
- primSource = null;
- factory = null;
- }
-
- @Test
- public void checkpointCoderIsSane() throws Exception {
- setupOneMessage(ImmutableList.<IncomingMessage>of());
- CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
- // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
- // equals on checkpoints ignore those fields, we'll test serialization and deserialization
- // of checkpoints in multipleReaders below.
- }
-
- @Test
- public void readOneMessage() throws IOException {
- setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- // Read one message.
- assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
- assertFalse(reader.advance());
- // ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- reader.close();
- }
-
- @Test
- public void timeoutAckAndRereadOneMessage() throws IOException {
- setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
- assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
- // Let the ACK deadline for the above expire.
- now.addAndGet(65 * 1000);
- pubsubClient.advance();
- // We'll now receive the same message again.
- assertTrue(reader.advance());
- assertEquals(DATA, reader.getCurrent());
- assertFalse(reader.advance());
- // Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- reader.close();
- }
-
- @Test
- public void extendAck() throws IOException {
- setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
- // Pull the first message but don't take a checkpoint for it.
- assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
- // Extend the ack
- now.addAndGet(55 * 1000);
- pubsubClient.advance();
- assertFalse(reader.advance());
- // Extend the ack again
- now.addAndGet(25 * 1000);
- pubsubClient.advance();
- assertFalse(reader.advance());
- // Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- reader.close();
- }
-
- @Test
- public void timeoutAckExtensions() throws IOException {
- setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
- // Pull the first message but don't take a checkpoint for it.
- assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
- // Extend the ack.
- now.addAndGet(55 * 1000);
- pubsubClient.advance();
- assertFalse(reader.advance());
- // Let the ack expire.
- for (int i = 0; i < 3; i++) {
- now.addAndGet(25 * 1000);
- pubsubClient.advance();
- assertFalse(reader.advance());
- }
- // Wait for resend.
- now.addAndGet(25 * 1000);
- pubsubClient.advance();
- // Reread the same message.
- assertTrue(reader.advance());
- assertEquals(DATA, reader.getCurrent());
- // Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- reader.close();
- }
-
- @Test
- public void multipleReaders() throws IOException {
- List<IncomingMessage> incoming = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- String data = String.format("data_%d", i);
- String ackid = String.format("ackid_%d", i);
- incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
- }
- setupOneMessage(incoming);
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- // Consume two messages, only read one.
- assertTrue(reader.start());
- assertEquals("data_0", reader.getCurrent());
-
- // Grab checkpoint.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- assertEquals(1, checkpoint.notYetReadIds.size());
- assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
-
- // Read second message.
- assertTrue(reader.advance());
- assertEquals("data_1", reader.getCurrent());
-
- // Restore from checkpoint.
- byte[] checkpointBytes =
- CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
- checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
- checkpointBytes);
- assertEquals(1, checkpoint.notYetReadIds.size());
- assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
-
- // Re-read second message.
- reader = primSource.createReader(p.getOptions(), checkpoint);
- assertTrue(reader.start());
- assertEquals("data_1", reader.getCurrent());
-
- // We are done.
- assertFalse(reader.advance());
-
- // ACK final message.
- checkpoint = reader.getCheckpointMark();
- checkpoint.finalizeCheckpoint();
- reader.close();
- }
-
- private long messageNumToTimestamp(int messageNum) {
- return TIMESTAMP + messageNum * 100;
- }
-
- @Test
- public void readManyMessages() throws IOException {
- Map<String, Integer> dataToMessageNum = new HashMap<>();
-
- final int m = 97;
- final int n = 10000;
- List<IncomingMessage> incoming = new ArrayList<>();
- for (int i = 0; i < n; i++) {
- // Make the messages timestamps slightly out of order.
- int messageNum = ((i / m) * m) + (m - 1) - (i % m);
- String data = String.format("data_%d", messageNum);
- dataToMessageNum.put(data, messageNum);
- String recid = String.format("recordid_%d", messageNum);
- String ackId = String.format("ackid_%d", messageNum);
- incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0,
- ackId, recid));
- }
- setupOneMessage(incoming);
-
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
-
- for (int i = 0; i < n; i++) {
- if (i == 0) {
- assertTrue(reader.start());
- } else {
- assertTrue(reader.advance());
- }
- // We'll checkpoint and ack within the 2min limit.
- now.addAndGet(30);
- pubsubClient.advance();
- String data = reader.getCurrent();
- Integer messageNum = dataToMessageNum.remove(data);
- // No duplicate messages.
- assertNotNull(messageNum);
- // Preserve timestamp.
- assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
- // Preserve record id.
- String recid = String.format("recordid_%d", messageNum);
- assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
-
- if (i % 1000 == 999) {
- // Estimated watermark can never get ahead of actual outstanding messages.
- long watermark = reader.getWatermark().getMillis();
- long minOutstandingTimestamp = Long.MAX_VALUE;
- for (Integer outstandingMessageNum : dataToMessageNum.values()) {
- minOutstandingTimestamp =
- Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
- }
- assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
- // Ack messages, but only every other finalization.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- if (i % 2000 == 1999) {
- checkpoint.finalizeCheckpoint();
- }
- }
- }
- // We are done.
- assertFalse(reader.advance());
- // We saw each message exactly once.
- assertTrue(dataToMessageNum.isEmpty());
- reader.close();
- }
-
- @Test
- public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
- TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
- factory = PubsubTestClient.createFactoryForCreateSubscription();
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
- factory,
- StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
- StaticValueProvider.of(topicPath),
- null,
- StringUtf8Coder.of(),
- null,
- null,
- null);
- assertThat(source.getSubscription(), nullValue());
-
- assertThat(source.getSubscription(), nullValue());
-
- PipelineOptions options = PipelineOptionsFactory.create();
- List<PubsubSource<String>> splits =
- (new PubsubSource<>(source)).generateInitialSplits(3, options);
- // We have at least one returned split
- assertThat(splits, hasSize(greaterThan(0)));
- for (PubsubSource<String> split : splits) {
- // Each split is equal
- assertThat(split, equalTo(splits.get(0)));
- }
-
- assertThat(splits.get(0).subscriptionPath, not(nullValue()));
- }
-
- @Test
- public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
- TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
- factory = PubsubTestClient.createFactoryForCreateSubscription();
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
- factory,
- StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
- StaticValueProvider.of(topicPath),
- null,
- StringUtf8Coder.of(),
- null,
- null,
- null);
- assertThat(source.getSubscription(), nullValue());
-
- assertThat(source.getSubscription(), nullValue());
-
- PipelineOptions options = PipelineOptionsFactory.create();
- PubsubSource<String> actualSource = new PubsubSource<>(source);
- PubsubReader<String> reader = actualSource.createReader(options, null);
- SubscriptionPath createdSubscription = reader.subscription;
- assertThat(createdSubscription, not(nullValue()));
-
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
-
- checkpoint.finalizeCheckpoint();
- PubsubCheckpoint<String> deserCheckpoint =
- CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
- assertThat(checkpoint.subscriptionPath, not(nullValue()));
- assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
-
- PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
- PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
-
- assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
- assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
- }
-
- /**
- * Tests that checkpoints finalized after the reader is closed succeed.
- */
- @Test
- public void closeWithActiveCheckpoints() throws Exception {
- setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
- reader.start();
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
- reader.close();
- checkpoint.finalizeCheckpoint();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
deleted file mode 100644
index 1a99d38..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for helper classes and methods in PubsubClient.
- */
-@RunWith(JUnit4.class)
-public class PubsubClientTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- //
- // Timestamp handling
- //
-
- private long parse(String timestamp) {
- Map<String, String> map = ImmutableMap.of("myLabel", timestamp);
- return PubsubClient.extractTimestamp("myLabel", null, map);
- }
-
- private void roundTripRfc339(String timestamp) {
- assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp));
- }
-
- private void truncatedRfc339(String timestamp, String truncatedTimestmap) {
- assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp));
- }
-
- @Test
- public void noTimestampLabelReturnsPubsubPublish() {
- final long time = 987654321L;
- long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null);
- assertEquals(time, timestamp);
- }
-
- @Test
- public void noTimestampLabelAndInvalidPubsubPublishThrowsError() {
- thrown.expect(NumberFormatException.class);
- PubsubClient.extractTimestamp(null, "not-a-date", null);
- }
-
- @Test
- public void timestampLabelWithNullAttributesThrowsError() {
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
- PubsubClient.extractTimestamp("myLabel", null, null);
- }
-
- @Test
- public void timestampLabelSetWithMissingAttributeThrowsError() {
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel");
- Map<String, String> map = ImmutableMap.of("otherLabel", "whatever");
- PubsubClient.extractTimestamp("myLabel", null, map);
- }
-
- @Test
- public void timestampLabelParsesMillisecondsSinceEpoch() {
- long time = 1446162101123L;
- Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time));
- long timestamp = PubsubClient.extractTimestamp("myLabel", null, map);
- assertEquals(time, timestamp);
- }
-
- @Test
- public void timestampLabelParsesRfc3339Seconds() {
- roundTripRfc339("2015-10-29T23:41:41Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339Tenths() {
- roundTripRfc339("2015-10-29T23:41:41.1Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339Hundredths() {
- roundTripRfc339("2015-10-29T23:41:41.12Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339Millis() {
- roundTripRfc339("2015-10-29T23:41:41.123Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339Micros() {
- // Note: micros part 456/1000 is dropped.
- truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339MicrosRounding() {
- // Note: micros part 999/1000 is dropped, not rounded up.
- truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z");
- }
-
- @Test
- public void timestampLabelWithInvalidFormatThrowsError() {
- thrown.expect(NumberFormatException.class);
- parse("not-a-timestamp");
- }
-
- @Test
- public void timestampLabelWithInvalidFormat2ThrowsError() {
- thrown.expect(NumberFormatException.class);
- parse("null");
- }
-
- @Test
- public void timestampLabelWithInvalidFormat3ThrowsError() {
- thrown.expect(NumberFormatException.class);
- parse("2015-10");
- }
-
- @Test
- public void timestampLabelParsesRfc3339WithSmallYear() {
- // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted
- // This is therefore a "small year" until this difference is reconciled.
- roundTripRfc339("1582-10-15T01:23:45.123Z");
- }
-
- @Test
- public void timestampLabelParsesRfc3339WithLargeYear() {
- // Year 9999 in range.
- roundTripRfc339("9999-10-29T23:41:41.123999Z");
- }
-
- @Test
- public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
- thrown.expect(NumberFormatException.class);
- // Year 10000 out of range.
- parse("10000-10-29T23:41:41.123999Z");
- }
-
- //
- // Paths
- //
-
- @Test
- public void projectPathFromIdWellFormed() {
- ProjectPath path = PubsubClient.projectPathFromId("test");
- assertEquals("projects/test", path.getPath());
- }
-
- @Test
- public void subscriptionPathFromNameWellFormed() {
- SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
- assertEquals("projects/test/subscriptions/something", path.getPath());
- assertEquals("/subscriptions/test/something", path.getV1Beta1Path());
- }
-
- @Test
- public void topicPathFromNameWellFormed() {
- TopicPath path = PubsubClient.topicPathFromName("test", "something");
- assertEquals("projects/test/topics/something", path.getPath());
- assertEquals("/topics/test/something", path.getV1Beta1Path());
- }
-}