You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:34 UTC

[26/55] [abbrv] beam git commit: Remove NexmarkDrivers and make execution runner-agnostic

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
deleted file mode 100644
index afddbd8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.Pubsub.Builder;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
-import com.google.api.services.pubsub.model.ListTopicsResponse;
-import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.util.Transport;
-
-/**
- * A Pubsub client using JSON transport.
- */
-class PubsubJsonClient extends PubsubClient {
-
-  private static class PubsubJsonClientFactory implements PubsubClientFactory {
-    private static HttpRequestInitializer chainHttpRequestInitializer(
-        Credentials credential, HttpRequestInitializer httpRequestInitializer) {
-      if (credential == null) {
-        return httpRequestInitializer;
-      } else {
-        return new ChainingHttpRequestInitializer(
-            new HttpCredentialsAdapter(credential),
-            httpRequestInitializer);
-      }
-    }
-
-    @Override
-    public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-        throws IOException {
-      Pubsub pubsub = new Builder(
-          Transport.getTransport(),
-          Transport.getJsonFactory(),
-          chainHttpRequestInitializer(
-              options.getGcpCredential(),
-              // Do not log 404. It clutters the output and is possibly even required by the caller.
-              new RetryHttpRequestInitializer(ImmutableList.of(404))))
-          .setRootUrl(options.getPubsubRootUrl())
-          .setApplicationName(options.getAppName())
-          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
-          .build();
-      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
-    }
-
-    @Override
-    public String getKind() {
-      return "Json";
-    }
-  }
-
-  /**
-   * Factory for creating Pubsub clients using Json transport.
-   */
-  public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Underlying JSON transport.
-   */
-  private Pubsub pubsub;
-
-  @VisibleForTesting PubsubJsonClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
-      Pubsub pubsub) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.pubsub = pubsub;
-  }
-
-  @Override
-  public void close() {
-    // Nothing to close.
-  }
-
-  @Override
-  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
-      Map<String, String> attributes = outgoingMessage.attributes;
-      if ((timestampLabel != null || idLabel != null) && attributes == null) {
-        attributes = new TreeMap<>();
-      }
-      if (attributes != null) {
-        pubsubMessage.setAttributes(attributes);
-      }
-
-      if (timestampLabel != null) {
-        attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
-        attributes.put(idLabel, outgoingMessage.recordId);
-      }
-
-      pubsubMessages.add(pubsubMessage);
-    }
-    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
-    PublishResponse response = pubsub.projects()
-                                     .topics()
-                                     .publish(topic.getPath(), request)
-                                     .execute();
-    return response.getMessageIds().size();
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize,
-      boolean returnImmediately) throws IOException {
-    PullRequest request = new PullRequest()
-        .setReturnImmediately(returnImmediately)
-        .setMaxMessages(batchSize);
-    PullResponse response = pubsub.projects()
-                                  .subscriptions()
-                                  .pull(subscription.getPath(), request)
-                                  .execute();
-    if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size());
-    for (ReceivedMessage message : response.getReceivedMessages()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.decodeData();
-
-      // Timestamp.
-      long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes);
-
-      // Ack id.
-      String ackId = message.getAckId();
-      checkState(!Strings.isNullOrEmpty(ackId));
-
-      // Record id, if any.
-      @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
-      }
-      if (Strings.isNullOrEmpty(recordId)) {
-        // Fall back to the Pubsub provided message id.
-        recordId = pubsubMessage.getMessageId();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
-                                               requestTimeMsSinceEpoch, ackId, recordId));
-    }
-
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
-    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
-    pubsub.projects()
-          .subscriptions()
-          .acknowledge(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        new ModifyAckDeadlineRequest().setAckIds(ackIds)
-                                      .setAckDeadlineSeconds(deadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .modifyAckDeadline(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .create(topic.getPath(), new Topic())
-          .execute(); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .delete(topic.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsResponse response = pubsub.projects()
-                                        .topics()
-                                        .list(project.getPath())
-                                        .execute();
-    if (response.getTopics() == null || response.getTopics().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
-    for (Topic topic : response.getTopics()) {
-      topics.add(topicPathFromPath(topic.getName()));
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = new Subscription()
-        .setTopic(topic.getPath())
-        .setAckDeadlineSeconds(ackDeadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .create(subscription.getPath(), request)
-          .execute(); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    pubsub.projects()
-          .subscriptions()
-          .delete(subscription.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
-      throws IOException {
-    ListSubscriptionsResponse response = pubsub.projects()
-                                               .subscriptions()
-                                               .list(project.getPath())
-                                               .execute();
-    if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size());
-    for (Subscription subscription : response.getSubscriptions()) {
-      if (subscription.getTopic().equals(topic.getPath())) {
-        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
-      }
-    }
-    return subscriptions;
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
-    Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute();
-    return response.getAckDeadlineSeconds();
-  }
-
-  @Override
-  public boolean isEOF() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
deleted file mode 100644
index 69ba2b0..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.Clock;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
-
-/**
- * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
- * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
- * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
- */
-class PubsubTestClient extends PubsubClient implements Serializable {
-  /**
-   * Mimic the state of the simulated Pubsub 'service'.
-   *
-   * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
-   * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
-   * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
-   * following data structures over all clients and must resort to a static.
-   */
-  private static class State {
-    /**
-     * True if has been primed for a test but not yet validated.
-     */
-    boolean isActive;
-
-    /**
-     * Publish mode only: Only publish calls for this topic are allowed.
-     */
-    @Nullable
-    TopicPath expectedTopic;
-
-    /**
-     * Publish mode only: Messages yet to seen in a {@link #publish} call.
-     */
-    @Nullable
-    Set<OutgoingMessage> remainingExpectedOutgoingMessages;
-
-    /**
-     * Publish mode only: Messages which should throw when first sent to simulate transient publish
-     * failure.
-     */
-    @Nullable
-    Set<OutgoingMessage> remainingFailingOutgoingMessages;
-
-    /**
-     * Pull mode only: Clock from which to get current time.
-     */
-    @Nullable
-    Clock clock;
-
-    /**
-     * Pull mode only: Only pull calls for this subscription are allowed.
-     */
-    @Nullable
-    SubscriptionPath expectedSubscription;
-
-    /**
-     * Pull mode only: Timeout to simulate.
-     */
-    int ackTimeoutSec;
-
-    /**
-     * Pull mode only: Messages waiting to be received by a {@link #pull} call.
-     */
-    @Nullable
-    List<IncomingMessage> remainingPendingIncomingMessages;
-
-    /**
-     * Pull mode only: Messages which have been returned from a {@link #pull} call and
-     * not yet ACKed by an {@link #acknowledge} call.
-     */
-    @Nullable
-    Map<String, IncomingMessage> pendingAckIncomingMessages;
-
-    /**
-     * Pull mode only: When above messages are due to have their ACK deadlines expire.
-     */
-    @Nullable
-    Map<String, Long> ackDeadline;
-  }
-
-  private static final State STATE = new State();
-
-  /** Closing the factory will validate all expected messages were processed. */
-  public interface PubsubTestClientFactory
-          extends PubsubClientFactory, Closeable, Serializable {
-  }
-
-  /**
-   * Return a factory for testing publishers. Only one factory may be in-flight at a time.
-   * The factory must be closed when the test is complete, at which point final validation will
-   * occur.
-   */
-  static PubsubTestClientFactory createFactoryForPublish(
-      final TopicPath expectedTopic,
-      final Iterable<OutgoingMessage> expectedOutgoingMessages,
-      final Iterable<OutgoingMessage> failingOutgoingMessages) {
-    synchronized (STATE) {
-      checkState(!STATE.isActive, "Test still in flight");
-      STATE.expectedTopic = expectedTopic;
-      STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
-      STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
-      STATE.isActive = true;
-    }
-    return new PubsubTestClientFactory() {
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient();
-      }
-
-      @Override
-      public String getKind() {
-        return "PublishTest";
-      }
-
-      @Override
-      public void close() {
-        synchronized (STATE) {
-          checkState(STATE.isActive, "No test still in flight");
-          checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
-                     "Still waiting for %s messages to be published",
-                     STATE.remainingExpectedOutgoingMessages.size());
-          STATE.isActive = false;
-          STATE.remainingExpectedOutgoingMessages = null;
-        }
-      }
-    };
-  }
-
-  /**
-   * Return a factory for testing subscribers. Only one factory may be in-flight at a time.
-   * The factory must be closed when the test in complete
-   */
-  public static PubsubTestClientFactory createFactoryForPull(
-      final Clock clock,
-      final SubscriptionPath expectedSubscription,
-      final int ackTimeoutSec,
-      final Iterable<IncomingMessage> expectedIncomingMessages) {
-    synchronized (STATE) {
-      checkState(!STATE.isActive, "Test still in flight");
-      STATE.clock = clock;
-      STATE.expectedSubscription = expectedSubscription;
-      STATE.ackTimeoutSec = ackTimeoutSec;
-      STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
-      STATE.pendingAckIncomingMessages = new HashMap<>();
-      STATE.ackDeadline = new HashMap<>();
-      STATE.isActive = true;
-    }
-    return new PubsubTestClientFactory() {
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient();
-      }
-
-      @Override
-      public String getKind() {
-        return "PullTest";
-      }
-
-      @Override
-      public void close() {
-        synchronized (STATE) {
-          checkState(STATE.isActive, "No test still in flight");
-          checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
-                     "Still waiting for %s messages to be pulled",
-                     STATE.remainingPendingIncomingMessages.size());
-          checkState(STATE.pendingAckIncomingMessages.isEmpty(),
-                     "Still waiting for %s messages to be ACKed",
-                     STATE.pendingAckIncomingMessages.size());
-          checkState(STATE.ackDeadline.isEmpty(),
-                     "Still waiting for %s messages to be ACKed",
-                     STATE.ackDeadline.size());
-          STATE.isActive = false;
-          STATE.remainingPendingIncomingMessages = null;
-          STATE.pendingAckIncomingMessages = null;
-          STATE.ackDeadline = null;
-        }
-      }
-    };
-  }
-
-  public static PubsubTestClientFactory createFactoryForCreateSubscription() {
-    return new PubsubTestClientFactory() {
-      int numCalls = 0;
-
-      @Override
-      public void close() throws IOException {
-        checkState(
-            numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
-      }
-
-      @Override
-      public PubsubClient newClient(
-          @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
-          throws IOException {
-        return new PubsubTestClient() {
-          @Override
-          public void createSubscription(
-              TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
-              throws IOException {
-            checkState(numCalls == 0, "Expected at most one subscription to be created");
-            numCalls++;
-          }
-        };
-      }
-
-      @Override
-      public String getKind() {
-        return "CreateSubscriptionTest";
-      }
-    };
-  }
-
-  /**
-   * Return true if in pull mode.
-   */
-  private boolean inPullMode() {
-    checkState(STATE.isActive, "No test is active");
-    return STATE.expectedSubscription != null;
-  }
-
-  /**
-   * Return true if in publish mode.
-   */
-  private boolean inPublishMode() {
-    checkState(STATE.isActive, "No test is active");
-    return STATE.expectedTopic != null;
-  }
-
-  /**
-   * For subscription mode only:
-   * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub
-   * expiring
-   * outstanding ACKs.
-   */
-  public void advance() {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only advance in pull mode");
-      // Any messages who's ACKs timed out are available for re-pulling.
-      Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
-      while (deadlineItr.hasNext()) {
-        Map.Entry<String, Long> entry = deadlineItr.next();
-        if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
-          STATE.remainingPendingIncomingMessages.add(
-              STATE.pendingAckIncomingMessages.remove(entry.getKey()));
-          deadlineItr.remove();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public int publish(
-      TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
-    synchronized (STATE) {
-      checkState(inPublishMode(), "Can only publish in publish mode");
-      checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
-                 STATE.expectedTopic);
-      for (OutgoingMessage outgoingMessage : outgoingMessages) {
-        if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
-          throw new RuntimeException("Simulating failure for " + outgoingMessage);
-        }
-        checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
-                   "Unexpected outgoing message %s", outgoingMessage);
-      }
-      return outgoingMessages.size();
-    }
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize,
-      boolean returnImmediately) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only pull in pull mode");
-      long now = STATE.clock.currentTimeMillis();
-      checkState(requestTimeMsSinceEpoch == now,
-                 "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch);
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-      checkState(returnImmediately, "Pull only supported if returning immediately");
-
-      List<IncomingMessage> incomingMessages = new ArrayList<>();
-      Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
-      while (pendItr.hasNext()) {
-        IncomingMessage incomingMessage = pendItr.next();
-        pendItr.remove();
-        IncomingMessage incomingMessageWithRequestTime =
-            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
-        incomingMessages.add(incomingMessageWithRequestTime);
-        STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
-                                             incomingMessageWithRequestTime);
-        STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
-                              requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
-        if (incomingMessages.size() >= batchSize) {
-          break;
-        }
-      }
-      return incomingMessages;
-    }
-  }
-
-  @Override
-  public void acknowledge(
-      SubscriptionPath subscription,
-      List<String> ackIds) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only acknowledge in pull mode");
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-
-      for (String ackId : ackIds) {
-        checkState(STATE.ackDeadline.remove(ackId) != null,
-                   "No message with ACK id %s is waiting for an ACK", ackId);
-        checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
-                   "No message with ACK id %s is waiting for an ACK", ackId);
-      }
-    }
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only modify ack deadline in pull mode");
-      checkState(subscription.equals(STATE.expectedSubscription),
-                 "Subscription %s does not match expected %s", subscription,
-                 STATE.expectedSubscription);
-
-      for (String ackId : ackIds) {
-        if (deadlineSeconds > 0) {
-          checkState(STATE.ackDeadline.remove(ackId) != null,
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
-        } else {
-          checkState(STATE.ackDeadline.remove(ackId) != null,
-                     "No message with ACK id %s is waiting for an ACK", ackId);
-          IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
-          checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
-          STATE.remainingPendingIncomingMessages.add(message);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(
-      ProjectPath project, TopicPath topic) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
-    synchronized (STATE) {
-      return STATE.ackTimeoutSec;
-    }
-  }
-
-  @Override
-  public boolean isEOF() {
-    synchronized (STATE) {
-      checkState(inPullMode(), "Can only check EOF in pull mode");
-      return STATE.remainingPendingIncomingMessages.isEmpty();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
deleted file mode 100644
index 1161f3e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Nexmark Beam IO related utilities.
- */
-package org.apache.beam.integration.nexmark.io;

http://git-wip-us.apache.org/repos/asf/beam/blob/a6dbdfa5/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
index c5d7725..d95461a 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.integration.nexmark.sources;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 
@@ -38,7 +38,7 @@ public class BoundedEventSourceTest {
 
   @Test
   public void sourceAndReadersWork() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
     long n = 200L;
     BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
 
@@ -48,7 +48,7 @@ public class BoundedEventSourceTest {
 
   @Test
   public void splitAtFractionRespectsContract() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
     long n = 20L;
     BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
 
@@ -62,7 +62,7 @@ public class BoundedEventSourceTest {
 
   @Test
   public void splitIntoBundlesRespectsContract() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
     long n = 200L;
     BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
     SourceTestUtils.assertSourcesEqualReferenceSource(