You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:03 UTC

[39/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 5ab8582..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
-  public BigtableServiceImpl(BigtableOptions options) {
-    this.options = options;
-  }
-
-  private final BigtableOptions options;
-
-  @Override
-  public BigtableWriterImpl openForWriting(String tableId) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    String tableName = options.getClusterName().toTableNameStr(tableId);
-    return new BigtableWriterImpl(session, tableName);
-  }
-
-  @Override
-  public boolean tableExists(String tableId) throws IOException {
-    if (!BigtableSession.isAlpnProviderEnabled()) {
-      logger.info(
-          "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
-              + " configured.",
-          tableId,
-          options);
-      return true;
-    }
-
-    try (BigtableSession session = new BigtableSession(options)) {
-      GetTableRequest getTable =
-          GetTableRequest.newBuilder()
-              .setName(options.getClusterName().toTableNameStr(tableId))
-              .build();
-      session.getTableAdminClient().getTable(getTable);
-      return true;
-    } catch (StatusRuntimeException e) {
-      if (e.getStatus().getCode() == Code.NOT_FOUND) {
-        return false;
-      }
-      String message =
-          String.format(
-              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
-      logger.error(message, e);
-      throw new IOException(message, e);
-    }
-  }
-
-  private class BigtableReaderImpl implements Reader {
-    private BigtableSession session;
-    private final BigtableSource source;
-    private ResultScanner<Row> results;
-    private Row currentRow;
-
-    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
-      this.session = session;
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      RowRange range =
-          RowRange.newBuilder()
-              .setStartKey(source.getRange().getStartKey().getValue())
-              .setEndKey(source.getRange().getEndKey().getValue())
-              .build();
-      ReadRowsRequest.Builder requestB =
-          ReadRowsRequest.newBuilder()
-              .setRowRange(range)
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
-      if (source.getRowFilter() != null) {
-        requestB.setFilter(source.getRowFilter());
-      }
-      results = session.getDataClient().readRows(requestB.build());
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      currentRow = results.next();
-      return (currentRow != null);
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Goal: by the end of this function, both results and session are null and closed,
-      // independent of what errors they throw or prior state.
-
-      if (session == null) {
-        // Only possible when previously closed, so we know that results is also null.
-        return;
-      }
-
-      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
-      // the Closer, but we can use the Closer to simplify the error handling.
-      try (Closer closer = Closer.create()) {
-        if (results != null) {
-          closer.register(results);
-          results = null;
-        }
-
-        session.close();
-      } finally {
-        session = null;
-      }
-    }
-
-    @Override
-    public Row getCurrentRow() throws NoSuchElementException {
-      if (currentRow == null) {
-        throw new NoSuchElementException();
-      }
-      return currentRow;
-    }
-  }
-
-  private static class BigtableWriterImpl implements Writer {
-    private BigtableSession session;
-    private AsyncExecutor executor;
-    private final MutateRowRequest.Builder partialBuilder;
-
-    public BigtableWriterImpl(BigtableSession session, String tableName) {
-      this.session = session;
-      this.executor =
-          new AsyncExecutor(
-              session.getDataClient(),
-              new HeapSizeManager(
-                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
-                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
-      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (executor != null) {
-          executor.flush();
-          executor = null;
-        }
-      } finally {
-        if (session != null) {
-          session.close();
-          session = null;
-        }
-      }
-    }
-
-    @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException {
-      MutateRowRequest r =
-          partialBuilder
-              .clone()
-              .setRowKey(record.getKey())
-              .addAllMutations(record.getValue())
-              .build();
-      try {
-        return executor.mutateRowAsync(r);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Write interrupted", e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects
-        .toStringHelper(BigtableServiceImpl.class)
-        .add("options", options)
-        .toString();
-  }
-
-  @Override
-  public Reader createReader(BigtableSource source) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    return new BigtableReaderImpl(session, source);
-  }
-
-  @Override
-  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
-    try (BigtableSession session = new BigtableSession(options)) {
-      SampleRowKeysRequest request =
-          SampleRowKeysRequest.newBuilder()
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
-              .build();
-      return session.getDataClient().sampleRowKeys(request);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
deleted file mode 100644
index 112a954..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
deleted file mode 100644
index de0bd86..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines transforms for reading and writing common storage formats, including
- * {@link com.google.cloud.dataflow.sdk.io.AvroIO},
- * {@link com.google.cloud.dataflow.sdk.io.BigQueryIO}, and
- * {@link com.google.cloud.dataflow.sdk.io.TextIO}.
- *
- * <p>The classes in this package provide {@code Read} transforms that create PCollections
- * from existing storage:
- * <pre>{@code
- * PCollection<TableRow> inputData = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations");
- * }</pre>
- * and {@code Write} transforms that persist PCollections to external storage:
- * <pre> {@code
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers"));
- * } </pre>
- */
-package com.google.cloud.dataflow.sdk.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
deleted file mode 100644
index 30772da..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ByteString.ByteIterator;
-
-import java.io.Serializable;
-
-/**
- * A class representing a key consisting of an array of bytes. Arbitrary-length
- * {@code byte[]} keys are typical in key-value stores such as Google Cloud Bigtable.
- *
- * <p>Instances of {@link ByteKey} are immutable.
- *
- * <p>{@link ByteKey} implements {@link Comparable Comparable&lt;ByteKey&gt;} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the successor
- * to a key is the same key with an additional 0 byte appended; and keys have unbounded size.
- *
- * <p>Note that the empty {@link ByteKey} compares smaller than all other keys, but some systems
- * have the semantic that when an empty {@link ByteKey} is used as an upper bound, it represents
- * the largest possible key. In these cases, implementors should use {@link #isEmpty} to test
- * whether an upper bound key is empty.
- */
-public final class ByteKey implements Comparable<ByteKey>, Serializable {
-  /** An empty key. */
-  public static final ByteKey EMPTY = ByteKey.of();
-
-  /**
-   * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
-   */
-  public static ByteKey of(ByteString value) {
-    return new ByteKey(value);
-  }
-
-  /**
-   * Creates a new {@link ByteKey} backed by a copy of the specified {@code byte[]}.
-   *
-   * <p>Makes a copy of the underlying array.
-   */
-  public static ByteKey copyFrom(byte[] bytes) {
-    return of(ByteString.copyFrom(bytes));
-  }
-
-  /**
-   * Creates a new {@link ByteKey} backed by a copy of the specified {@code int[]}. This method is
-   * primarily used as a convenience to create a {@link ByteKey} in code without casting down to
-   * signed Java {@link Byte bytes}:
-   *
-   * <pre>{@code
-   * ByteKey key = ByteKey.of(0xde, 0xad, 0xbe, 0xef);
-   * }</pre>
-   *
-   * <p>Makes a copy of the input.
-   */
-  public static ByteKey of(int... bytes) {
-    byte[] ret = new byte[bytes.length];
-    for (int i = 0; i < bytes.length; ++i) {
-      ret[i] = (byte) (bytes[i] & 0xff);
-    }
-    return ByteKey.copyFrom(ret);
-  }
-
-  /**
-   * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
-   *
-   * <p>Does not copy.
-   */
-  public ByteString getValue() {
-    return value;
-  }
-
-  /**
-   * Returns a newly-allocated {@code byte[]} representing this {@link ByteKey}.
-   *
-   * <p>Copies the underlying {@code byte[]}.
-   */
-  public byte[] getBytes() {
-    return value.toByteArray();
-  }
-
-  /**
-   * Returns {@code true} if the {@code byte[]} backing this {@link ByteKey} is of length 0.
-   */
-  public boolean isEmpty() {
-    return value.isEmpty();
-  }
-
-  /**
-   * {@link ByteKey} implements {@link Comparable Comparable&lt;ByteKey&gt;} by comparing the
-   * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the
-   * successor to a key is the same key with an additional 0 byte appended; and keys have unbounded
-   * size.
-   */
-  @Override
-  public int compareTo(ByteKey other) {
-    checkNotNull(other, "other");
-    ByteIterator thisIt = value.iterator();
-    ByteIterator otherIt = other.value.iterator();
-    while (thisIt.hasNext() && otherIt.hasNext()) {
-      // (byte & 0xff) converts [-128,127] bytes to [0,255] ints.
-      int cmp = (thisIt.nextByte() & 0xff) - (otherIt.nextByte() & 0xff);
-      if (cmp != 0) {
-        return cmp;
-      }
-    }
-    // If we get here, the prefix of both arrays is equal up to the shorter array. The array with
-    // more bytes is larger.
-    return value.size() - other.value.size();
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  private final ByteString value;
-
-  private ByteKey(ByteString value) {
-    this.value = value;
-  }
-
-  /** Array used as a helper in {@link #toString}. */
-  private static final char[] HEX =
-      new char[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
-
-  // Prints the key as a string "[deadbeef]".
-  @Override
-  public String toString() {
-    char[] encoded = new char[2 * value.size() + 2];
-    encoded[0] = '[';
-    int cnt = 1;
-    ByteIterator iterator = value.iterator();
-    while (iterator.hasNext()) {
-      byte b = iterator.nextByte();
-      encoded[cnt] = HEX[(b & 0xF0) >>> 4];
-      ++cnt;
-      encoded[cnt] = HEX[b & 0xF];
-      ++cnt;
-    }
-    encoded[cnt] = ']';
-    return new String(encoded);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (!(o instanceof ByteKey)) {
-      return false;
-    }
-    ByteKey other = (ByteKey) o;
-    return (other.value.size() == value.size()) && this.compareTo(other) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return value.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
deleted file mode 100644
index 6f58d39..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A class representing a range of {@link ByteKey ByteKeys}.
- *
- * <p>Instances of {@link ByteKeyRange} are immutable.
- *
- * <p>A {@link ByteKeyRange} enforces the restriction that its start and end keys must form a valid,
- * non-empty range {@code [startKey, endKey)} that is inclusive of the start key and exclusive of
- * the end key.
- *
- * <p>When the end key is empty, it is treated as the largest possible key.
- *
- * <h3>Interpreting {@link ByteKey} in a {@link ByteKeyRange}</h3>
- *
- * <p>The primary role of {@link ByteKeyRange} is to provide functionality for
- * {@link #estimateFractionForKey(ByteKey)}, {@link #interpolateKey(double)}, and
- * {@link #split(int)}, which are used for Google Cloud Dataflow's
- * <a href="https://cloud.google.com/dataflow/service/dataflow-service-desc#AutoScaling">Autoscaling
- * and Dynamic Work Rebalancing</a> features.
- *
- * <p>{@link ByteKeyRange} implements these features by treating a {@link ByteKey}'s underlying
- * {@code byte[]} as the binary expansion of floating point numbers in the range {@code [0.0, 1.0]}.
- * For example, the keys {@code ByteKey.of(0x80)}, {@code ByteKey.of(0xc0)}, and
- * {@code ByteKey.of(0xe0)} are interpreted as {@code 0.5}, {@code 0.75}, and {@code 0.875}
- * respectively. The empty {@code ByteKey.EMPTY} is interpreted as {@code 0.0} when used as the
- * start of a range and {@code 1.0} when used as the end key.
- *
- * <p>Key interpolation, fraction estimation, and range splitting are all interpreted in these
- * floating-point semantics. See the respective implementations for further details. <b>Note:</b>
- * the underlying implementations of these functions use {@link BigInteger} and {@link BigDecimal},
- * so they can be slow and should not be called in hot loops. Dataflow's dynamic work
- * rebalancing will only invoke these functions during periodic control operations, so they are not
- * called on the critical path.
- *
- * @see ByteKey
- */
-public final class ByteKeyRange implements Serializable {
-  private static final Logger logger = LoggerFactory.getLogger(ByteKeyRange.class);
-
-  /** The range of all keys, with empty start and end keys. */
-  public static final ByteKeyRange ALL_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.EMPTY);
-
-  /**
-   * Creates a new {@link ByteKeyRange} with the given start and end keys.
-   *
-   * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
-   *
-   * @see ByteKeyRange
-   *
-   * @throws IllegalArgumentException if {@code endKey} is less than or equal to {@code startKey},
-   *     unless {@code endKey} is empty indicating the maximum possible {@link ByteKey}.
-   */
-  public static ByteKeyRange of(ByteKey startKey, ByteKey endKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  /**
-   * Returns the {@link ByteKey} representing the lower bound of this {@link ByteKeyRange}.
-   */
-  public ByteKey getStartKey() {
-    return startKey;
-  }
-
-  /**
-   * Returns the {@link ByteKey} representing the upper bound of this {@link ByteKeyRange}.
-   *
-   * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
-   */
-  public ByteKey getEndKey() {
-    return endKey;
-  }
-
-  /**
-   * Returns {@code true} if the specified {@link ByteKey} is contained within this range.
-   */
-  public Boolean containsKey(ByteKey key) {
-    return key.compareTo(startKey) >= 0 && endsAfterKey(key);
-  }
-
-  /**
-   * Returns {@code true} if the specified {@link ByteKeyRange} overlaps this range.
-   */
-  public Boolean overlaps(ByteKeyRange other) {
-    // If each range starts before the other range ends, then they must overlap.
-    //     { [] } -- one range inside the other   OR   { [ } ] -- partial overlap.
-    return endsAfterKey(other.startKey) && other.endsAfterKey(startKey);
-  }
-
-  /**
-   * Returns a list of up to {@code numSplits + 1} {@link ByteKey ByteKeys} in ascending order,
-   * where the keys have been interpolated to form roughly equal sub-ranges of this
-   * {@link ByteKeyRange}, assuming a uniform distribution of keys within this range.
-   *
-   * <p>The first {@link ByteKey} in the result is guaranteed to be equal to {@link #getStartKey},
-   * and the last {@link ByteKey} in the result is guaranteed to be equal to {@link #getEndKey}.
-   * Thus the resulting list exactly spans the same key range as this {@link ByteKeyRange}.
-   *
-   * <p>Note that the number of keys returned is not always equal to {@code numSplits + 1}.
-   * Specifically, if this range is unsplittable (e.g., because the start and end keys are equal
-   * up to padding by zero bytes), the list returned will only contain the start and end key.
-   *
-   * @throws IllegalArgumentException if the specified number of splits is < 1
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about split semantics.
-   */
-  public List<ByteKey> split(int numSplits) {
-    checkArgument(numSplits > 0, "numSplits %s must be a positive integer", numSplits);
-
-    try {
-      ImmutableList.Builder<ByteKey> ret = ImmutableList.builder();
-      ret.add(startKey);
-      for (int i = 1; i < numSplits; ++i) {
-        ret.add(interpolateKey(i / (double) numSplits));
-      }
-      ret.add(endKey);
-      return ret.build();
-    } catch (IllegalStateException e) {
-      // The range is not splittable -- just return
-      return ImmutableList.of(startKey, endKey);
-    }
-  }
-
-  /**
-   * Returns the fraction of this range {@code [startKey, endKey)} that is in the interval
-   * {@code [startKey, key)}.
-   *
-   * @throws IllegalArgumentException if {@code key} does not fall within this range
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
-   */
-  public double estimateFractionForKey(ByteKey key) {
-    checkNotNull(key, "key");
-    checkArgument(!key.isEmpty(), "Cannot compute fraction for an empty key");
-    checkArgument(
-        key.compareTo(startKey) >= 0, "Expected key %s >= range start key %s", key, startKey);
-
-    if (key.equals(endKey)) {
-      return 1.0;
-    }
-    checkArgument(containsKey(key), "Cannot compute fraction for %s outside this %s", key, this);
-
-    byte[] startBytes = startKey.getBytes();
-    byte[] endBytes = endKey.getBytes();
-    byte[] keyBytes = key.getBytes();
-    // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
-    // keys, to get a concrete least upper bound for the desired range.
-    if (endKey.isEmpty()) {
-      startBytes = addHeadByte(startBytes, (byte) 0);
-      endBytes = addHeadByte(endBytes, (byte) 1);
-      keyBytes = addHeadByte(keyBytes, (byte) 0);
-    }
-
-    // Pad to the longest of all 3 keys.
-    int paddedKeyLength = Math.max(Math.max(startBytes.length, endBytes.length), keyBytes.length);
-    BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
-    BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
-    BigInteger keyInt = paddedPositiveInt(keyBytes, paddedKeyLength);
-
-    // Keys are equal subject to padding by 0.
-    BigInteger range = rangeEndInt.subtract(rangeStartInt);
-    if (range.equals(BigInteger.ZERO)) {
-      logger.warn(
-          "Using 0.0 as the default fraction for this near-empty range {} where start and end keys"
-              + " differ only by trailing zeros.",
-          this);
-      return 0.0;
-    }
-
-    // Compute the progress (key-start)/(end-start) scaling by 2^64, dividing (which rounds),
-    // and then scaling down after the division. This gives ample precision when converted to
-    // double.
-    BigInteger progressScaled = keyInt.subtract(rangeStartInt).shiftLeft(64);
-    return progressScaled.divide(range).doubleValue() / Math.pow(2, 64);
-  }
-
-  /**
-   * Returns a {@link ByteKey} {@code key} such that {@code [startKey, key)} represents
-   * approximately the specified fraction of the range {@code [startKey, endKey)}. The interpolation
-   * is computed assuming a uniform distribution of keys.
-   *
-   * <p>For example, given the largest possible range (defined by empty start and end keys), the
-   * fraction {@code 0.5} will return the {@code ByteKey.of(0x80)}, which will also be returned for
-   * ranges {@code [0x40, 0xc0)} and {@code [0x6f, 0x91)}.
-   *
-   * <p>The key returned will never be empty.
-   *
-   * @throws IllegalArgumentException if {@code fraction} is outside the range [0, 1)
-   * @throws IllegalStateException if this range cannot be interpolated
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
-   */
-  public ByteKey interpolateKey(double fraction) {
-    checkArgument(
-        fraction >= 0.0 && fraction < 1.0, "Fraction %s must be in the range [0, 1)", fraction);
-    byte[] startBytes = startKey.getBytes();
-    byte[] endBytes = endKey.getBytes();
-    // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
-    // keys, to get a concrete least upper bound for the desired range.
-    if (endKey.isEmpty()) {
-      startBytes = addHeadByte(startBytes, (byte) 0);
-      endBytes = addHeadByte(endBytes, (byte) 1);
-    }
-
-    // Pad to the longest key.
-    int paddedKeyLength = Math.max(startBytes.length, endBytes.length);
-    BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
-    BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
-
-    // If the keys are equal subject to padding by 0, we can't interpolate.
-    BigInteger range = rangeEndInt.subtract(rangeStartInt);
-    checkState(
-        !range.equals(BigInteger.ZERO),
-        "Refusing to interpolate for near-empty %s where start and end keys differ only by trailing"
-            + " zero bytes.",
-        this);
-
-    // Add precision so that range is at least 53 (double mantissa length) bits long. This way, we
-    // can interpolate small ranges finely, e.g., split the range key 3 to key 4 into 1024 parts.
-    // We add precision to range by adding zero bytes to the end of the keys, aka shifting the
-    // underlying BigInteger left by a multiple of 8 bits.
-    int bytesNeeded = ((53 - range.bitLength()) + 7) / 8;
-    if (bytesNeeded > 0) {
-      range = range.shiftLeft(bytesNeeded * 8);
-      rangeStartInt = rangeStartInt.shiftLeft(bytesNeeded * 8);
-      paddedKeyLength += bytesNeeded;
-    }
-
-    BigInteger interpolatedOffset =
-        new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger();
-
-    int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength;
-    return ByteKey.copyFrom(
-        fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength));
-  }
-
-  /**
-   * Returns new {@link ByteKeyRange} like this one, but with the specified start key.
-   */
-  public ByteKeyRange withStartKey(ByteKey startKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  /**
-   * Returns new {@link ByteKeyRange} like this one, but with the specified end key.
-   */
-  public ByteKeyRange withEndKey(ByteKey endKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  private final ByteKey startKey;
-  private final ByteKey endKey;
-
-  private ByteKeyRange(ByteKey startKey, ByteKey endKey) {
-    this.startKey = checkNotNull(startKey, "startKey");
-    this.endKey = checkNotNull(endKey, "endKey");
-    checkArgument(endsAfterKey(startKey), "Start %s must be less than end %s", startKey, endKey);
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(ByteKeyRange.class)
-        .add("startKey", startKey)
-        .add("endKey", endKey)
-        .toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (!(o instanceof ByteKeyRange)) {
-      return false;
-    }
-    ByteKeyRange other = (ByteKeyRange) o;
-    return Objects.equals(startKey, other.startKey) && Objects.equals(endKey, other.endKey);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(startKey, endKey);
-  }
-
-  /**
-   * Returns a copy of the specified array with the specified byte added at the front.
-   */
-  private static byte[] addHeadByte(byte[] array, byte b) {
-    byte[] ret = new byte[array.length + 1];
-    ret[0] = b;
-    System.arraycopy(array, 0, ret, 1, array.length);
-    return ret;
-  }
-
-  /**
-   * Ensures the array is exactly {@code size} bytes long. Returns the input array if the condition
-   * is met, otherwise either adds or removes zero bytes from the beginning of {@code array}.
-   */
-  private static byte[] fixupHeadZeros(byte[] array, int size) {
-    int padding = size - array.length;
-    if (padding == 0) {
-      return array;
-    }
-
-    if (padding < 0) {
-      // There is one zero byte at the beginning, added by BigInteger to make there be a sign
-      // bit when converting to bytes.
-      verify(
-          padding == -1,
-          "key %s: expected length %d with exactly one byte of padding, found %d",
-          ByteKey.copyFrom(array),
-          size,
-          -padding);
-      verify(
-          (array[0] == 0) && ((array[1] & 0x80) == 0x80),
-          "key %s: is 1 byte longer than expected, indicating BigInteger padding. Expect first byte"
-              + " to be zero with set MSB in second byte.",
-          ByteKey.copyFrom(array));
-      return Arrays.copyOfRange(array, 1, array.length);
-    }
-
-    byte[] ret = new byte[size];
-    System.arraycopy(array, 0, ret, padding, array.length);
-    return ret;
-  }
-
-  /**
-   * Returns {@code true} when the specified {@code key} is smaller this range's end key. The only
-   * semantic change from {@code (key.compareTo(getEndKey()) < 0)} is that the empty end key is
-   * treated as larger than all possible {@link ByteKey keys}.
-   */
-  boolean endsAfterKey(ByteKey key) {
-    return endKey.isEmpty() || key.compareTo(endKey) < 0;
-  }
-
-  /** Builds a BigInteger out of the specified array, padded to the desired byte length. */
-  private static BigInteger paddedPositiveInt(byte[] bytes, int length) {
-    int bytePaddingNeeded = length - bytes.length;
-    checkArgument(
-        bytePaddingNeeded >= 0, "Required bytes.length {} < length {}", bytes.length, length);
-    BigInteger ret = new BigInteger(1, bytes);
-    return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
deleted file mode 100644
index f6796cc..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RangeTracker} for {@link ByteKey ByteKeys} in {@link ByteKeyRange ByteKeyRanges}.
- *
- * @see ByteKey
- * @see ByteKeyRange
- */
-public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
-  private static final Logger logger = LoggerFactory.getLogger(ByteKeyRangeTracker.class);
-
-  /** Instantiates a new {@link ByteKeyRangeTracker} with the specified range. */
-  public static ByteKeyRangeTracker of(ByteKeyRange range) {
-    return new ByteKeyRangeTracker(range);
-  }
-
-  @Override
-  public synchronized ByteKey getStartPosition() {
-    return range.getStartKey();
-  }
-
-  @Override
-  public synchronized ByteKey getStopPosition() {
-    return range.getEndKey();
-  }
-
-  @Override
-  public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
-    if (isAtSplitPoint && !range.containsKey(recordStart)) {
-      return false;
-    }
-    position = recordStart;
-    return true;
-  }
-
-  @Override
-  public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
-    // Unstarted.
-    if (position == null) {
-      logger.warn(
-          "{}: Rejecting split request at {} because no records have been returned.",
-          this,
-          splitPosition);
-      return false;
-    }
-
-    // Started, but not after current position.
-    if (splitPosition.compareTo(position) <= 0) {
-      logger.warn(
-          "{}: Rejecting split request at {} because it is not after current position {}.",
-          this,
-          splitPosition,
-          position);
-      return false;
-    }
-
-    // Sanity check.
-    if (!range.containsKey(splitPosition)) {
-      logger.warn(
-          "{}: Rejecting split request at {} because it is not within the range.",
-          this,
-          splitPosition);
-      return false;
-    }
-
-    range = range.withEndKey(splitPosition);
-    return true;
-  }
-
-  @Override
-  public synchronized double getFractionConsumed() {
-    if (position == null) {
-      return 0;
-    }
-    return range.estimateFractionForKey(position);
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////
-  private ByteKeyRange range;
-  @Nullable private ByteKey position;
-
-  private ByteKeyRangeTracker(ByteKeyRange range) {
-    this.range = range;
-    this.position = null;
-  }
-
-  @Override
-  public String toString() {
-    return toStringHelper(ByteKeyRangeTracker.class)
-        .add("range", range)
-        .add("position", position)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
deleted file mode 100644
index b237217..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link RangeTracker} for non-negative positions of type {@code long}.
- */
-public class OffsetRangeTracker implements RangeTracker<Long> {
-  private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
-
-  private final long startOffset;
-  private long stopOffset;
-  private long lastRecordStart = -1L;
-  private long offsetOfLastSplitPoint = -1L;
-
-  /**
-   * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and
-   * indicates reading all of the records until the end without specifying exactly what the end is.
-   *
-   * <p>Infinite ranges cannot be split because it is impossible to estimate progress within them.
-   */
-  public static final long OFFSET_INFINITY = Long.MAX_VALUE;
-
-  /**
-   * Creates an {@code OffsetRangeTracker} for the specified range.
-   */
-  public OffsetRangeTracker(long startOffset, long stopOffset) {
-    this.startOffset = startOffset;
-    this.stopOffset = stopOffset;
-  }
-
-  @Override
-  public synchronized Long getStartPosition() {
-    return startOffset;
-  }
-
-  @Override
-  public synchronized Long getStopPosition() {
-    return stopOffset;
-  }
-
-  @Override
-  public boolean tryReturnRecordAt(boolean isAtSplitPoint, Long recordStart) {
-    return tryReturnRecordAt(isAtSplitPoint, recordStart.longValue());
-  }
-
-  public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {
-    if (lastRecordStart == -1 && !isAtSplitPoint) {
-      throw new IllegalStateException(
-          String.format("The first record [starting at %d] must be at a split point", recordStart));
-    }
-    if (recordStart < lastRecordStart) {
-      throw new IllegalStateException(
-          String.format(
-              "Trying to return record [starting at %d] "
-                  + "which is before the last-returned record [starting at %d]",
-              recordStart,
-              lastRecordStart));
-    }
-    if (isAtSplitPoint) {
-      if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) {
-        throw new IllegalStateException(
-            String.format(
-                "Record at a split point has same offset as the previous split point: "
-                    + "previous split point at %d, current record starts at %d",
-                offsetOfLastSplitPoint, recordStart));
-      }
-      if (recordStart >= stopOffset) {
-        return false;
-      }
-      offsetOfLastSplitPoint = recordStart;
-    }
-
-    lastRecordStart = recordStart;
-    return true;
-  }
-
-  @Override
-  public boolean trySplitAtPosition(Long splitOffset) {
-    return trySplitAtPosition(splitOffset.longValue());
-  }
-
-  public synchronized boolean trySplitAtPosition(long splitOffset) {
-    if (stopOffset == OFFSET_INFINITY) {
-      LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);
-      return false;
-    }
-    if (lastRecordStart == -1) {
-      LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);
-      return false;
-    }
-
-    // Note: technically it is correct to split at any position after the last returned
-    // split point, not just the last returned record.
-    // TODO: Investigate whether in practice this is useful or, rather, confusing.
-    if (splitOffset <= lastRecordStart) {
-      LOG.debug(
-          "Refusing to split {} at {}: already past proposed split position", this, splitOffset);
-      return false;
-    }
-    if (splitOffset < startOffset || splitOffset >= stopOffset) {
-      LOG.debug(
-          "Refusing to split {} at {}: proposed split position out of range", this, splitOffset);
-      return false;
-    }
-    LOG.debug("Agreeing to split {} at {}", this, splitOffset);
-    this.stopOffset = splitOffset;
-    return true;
-  }
-
-  /**
-   * Returns a position {@code P} such that the range {@code [start, P)} represents approximately
-   * the given fraction of the range {@code [start, end)}. Assumes that the density of records
-   * in the range is approximately uniform.
-   */
-  public synchronized long getPositionForFractionConsumed(double fraction) {
-    if (stopOffset == OFFSET_INFINITY) {
-      throw new IllegalArgumentException(
-          "getPositionForFractionConsumed is not applicable to an unbounded range: " + this);
-    }
-    return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset));
-  }
-
-  @Override
-  public synchronized double getFractionConsumed() {
-    if (stopOffset == OFFSET_INFINITY) {
-      return 0.0;
-    }
-    if (lastRecordStart == -1) {
-      return 0.0;
-    }
-    // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
-    // which is (4 - 3 + 1) / (6 - 3) = 67%.
-    // Also, clamp to at most 1.0 because the last consumed position can extend past the
-    // stop position.
-    return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
-  }
-
-  @Override
-  public synchronized String toString() {
-    String stopString = (stopOffset == OFFSET_INFINITY) ? "infinity" : String.valueOf(stopOffset);
-    if (lastRecordStart >= 0) {
-      return String.format(
-          "<at [starting at %d] of offset range [%d, %s)>",
-          lastRecordStart,
-          startOffset,
-          stopString);
-    } else {
-      return String.format("<unstarted in offset range [%d, %s)>", startOffset, stopString);
-    }
-  }
-
-  /**
-   * Returns a copy of this tracker for testing purposes (to simplify testing methods with
-   * side effects).
-   */
-  @VisibleForTesting
-  OffsetRangeTracker copy() {
-    OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset);
-    res.lastRecordStart = this.lastRecordStart;
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
deleted file mode 100644
index 84359f1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-/**
- * A {@code RangeTracker} is a thread-safe helper object for implementing dynamic work rebalancing
- * in position-based {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}
- * subclasses.
- *
- * <h3>Usage of the RangeTracker class hierarchy</h3>
- * The abstract {@code RangeTracker} interface should not be used per se - all users should use its
- * subclasses directly. We declare it here because all subclasses have roughly the same interface
- * and the same properties, to centralize the documentation. Currently we provide one
- * implementation - {@link OffsetRangeTracker}.
- *
- * <h3>Position-based sources</h3>
- * A position-based source is one where the source can be described by a range of positions of
- * an ordered type and the records returned by the reader can be described by positions of the
- * same type.
- *
- * <p>In case a record occupies a range of positions in the source, the most important thing about
- * the record is the position where it starts.
- *
- * <p>Defining the semantics of positions for a source is entirely up to the source class, however
- * the chosen definitions have to obey certain properties in order to make it possible to correctly
- * split the source into parts, including dynamic splitting. Two main aspects need to be defined:
- * <ul>
- *   <li>How to assign starting positions to records.
- *   <li>Which records should be read by a source with a range {@code [A, B)}.
- * </ul>
- * Moreover, reading a range must be <i>efficient</i>, i.e., the performance of reading a range
- * should not significantly depend on the location of the range. For example, reading the range
- * {@code [A, B)} should not require reading all data before {@code A}.
- *
- * <p>The sections below explain exactly what properties these definitions must satisfy, and
- * how to use a {@code RangeTracker} with a properly defined source.
- *
- * <h3>Properties of position-based sources</h3>
- * The main requirement for position-based sources is <i>associativity</i>: reading records from
- * {@code [A, B)} and records from {@code [B, C)} should give the same records as reading from
- * {@code [A, C)}, where {@code A <= B <= C}. This property ensures that no matter how a range
- * of positions is split into arbitrarily many sub-ranges, the total set of records described by
- * them stays the same.
- *
- * <p>The other important property is how the source's range relates to positions of records in
- * the source. In many sources each record can be identified by a unique starting position.
- * In this case:
- * <ul>
- *   <li>All records returned by a source {@code [A, B)} must have starting positions
- *   in this range.
- *   <li>All but the last record should end within this range. The last record may or may not
- *   extend past the end of the range.
- *   <li>Records should not overlap.
- * </ul>
- * Such sources should define "read {@code [A, B)}" as "read from the first record starting at or
- * after A, up to but not including the first record starting at or after B".
- *
- * <p>Some examples of such sources include reading lines or CSV from a text file, reading keys and
- * values from a BigTable, etc.
- *
- * <p>The concept of <i>split points</i> allows to extend the definitions for dealing with sources
- * where some records cannot be identified by a unique starting position.
- *
- * <p>In all cases, all records returned by a source {@code [A, B)} must <i>start</i> at or after
- * {@code A}.
- *
- * <h3>Split points</h3>
- *
- * <p>Some sources may have records that are not directly addressable. For example, imagine a file
- * format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but
- * records within the block cannot be directly addressed without decompressing the block. Let us
- * refer to this hypothetical format as <i>CBF (Compressed Blocks Format)</i>.
- *
- * <p>Many such formats can still satisfy the associativity property. For example, in CBF, reading
- * {@code [A, B)} can mean "read all the records in all blocks whose starting offset is in
- * {@code [A, B)}".
- *
- * <p>To support such complex formats, we introduce the notion of <i>split points</i>. We say that
- * a record is a split point if there exists a position {@code A} such that the record is the first
- * one to be returned when reading the range  {@code [A, infinity)}. In CBF, the only split points
- * would be the first records in each block.
- *
- * <p>Split points allow us to define the meaning of a record's position and a source's range
- * in all cases:
- * <ul>
- *   <li>For a record that is at a split point, its position is defined to be the largest
- *   {@code A} such that reading a source with the range {@code [A, infinity)} returns this record;
- *   <li>Positions of other records are only required to be non-decreasing;
- *   <li>Reading the source {@code [A, B)} must return records starting from the first split point
- *   at or after {@code A}, up to but not including the first split point at or after {@code B}.
- *   In particular, this means that the first record returned by a source MUST always be
- *   a split point.
- *   <li>Positions of split points must be unique.
- * </ul>
- * As a result, for any decomposition of the full range of the source into position ranges, the
- * total set of records will be the full set of records in the source, and each record
- * will be read exactly once.
- *
- * <h3>Consumed positions</h3>
- * As the source is being read, and records read from it are being passed to the downstream
- * transforms in the pipeline, we say that positions in the source are being <i>consumed</i>.
- * When a reader has read a record (or promised to a caller that a record will be returned),
- * positions up to and including the record's start position are considered <i>consumed</i>.
- *
- * <p>Dynamic splitting can happen only at <i>unconsumed</i> positions. If the reader just
- * returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or
- * beyond, as otherwise that record could be read twice (by the current reader and by a reader
- * of the task starting at 43).
- *
- * <h3>Example</h3>
- * The following example uses an {@link OffsetRangeTracker} to support dynamically splitting
- * a source with integer positions (offsets).
- * <pre> {@code
- *   class MyReader implements BoundedReader<Foo> {
- *     private MySource currentSource;
- *     private final OffsetRangeTracker tracker = new OffsetRangeTracker();
- *     ...
- *     MyReader(MySource source) {
- *       this.currentSource = source;
- *       this.tracker = new MyRangeTracker<>(source.getStartOffset(), source.getEndOffset())
- *     }
- *     ...
- *     boolean start() {
- *       ... (general logic for locating the first record) ...
- *       if (!tracker.tryReturnRecordAt(true, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *     boolean advance() {
- *       ... (general logic for locating the next record) ...
- *       if (!tracker.tryReturnRecordAt(isAtSplitPoint, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *
- *     double getFractionConsumed() {
- *       return tracker.getFractionConsumed();
- *     }
- *   }
- * } </pre>
- *
- * <h3>Usage with different models of iteration</h3>
- * When using this class to protect a
- * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}, follow the pattern
- * described above.
- *
- * <p>When using this class to protect iteration in the {@code hasNext()/next()}
- * model, consider the record consumed when {@code hasNext()} is about to return true, rather than
- * when {@code next()} is called, because {@code hasNext()} returning true is promising the caller
- * that {@code next()} will have an element to return - so {@link #trySplitAtPosition} must not
- * split the range in a way that would make the record promised by {@code hasNext()} belong to
- * a different range.
- *
- * <p>Also note that implementations of {@code hasNext()} need to ensure
- * that they call {@link #tryReturnRecordAt} only once even if {@code hasNext()} is called
- * repeatedly, due to the requirement on uniqueness of split point positions.
- *
- * @param <PositionT> Type of positions used by the source to define ranges and identify records.
- */
-public interface RangeTracker<PositionT> {
-  /**
-   * Returns the starting position of the current range, inclusive.
-   */
-  PositionT getStartPosition();
-
-  /**
-   * Returns the ending position of the current range, exclusive.
-   */
-  PositionT getStopPosition();
-
-  /**
-   * Atomically determines whether a record at the given position can be returned and updates
-   * internal state. In particular:
-   * <ul>
-   *   <li>If {@code isAtSplitPoint} is {@code true}, and {@code recordStart} is outside the current
-   *   range, returns {@code false};
-   *   <li>Otherwise, updates the last-consumed position to {@code recordStart} and returns
-   *   {@code true}.
-   * </ul>
-   * <p>This method MUST be called on all split point records. It may be called on every record.
-   */
-  boolean tryReturnRecordAt(boolean isAtSplitPoint, PositionT recordStart);
-
-  /**
-   * Atomically splits the current range [{@link #getStartPosition}, {@link #getStopPosition})
-   * into a "primary" part [{@link #getStartPosition}, {@code splitPosition})
-   * and a "residual" part [{@code splitPosition}, {@link #getStopPosition}), assuming the current
-   * last-consumed position is within [{@link #getStartPosition}, splitPosition)
-   * (i.e., {@code splitPosition} has not been consumed yet).
-   *
-   * <p>Updates the current range to be the primary and returns {@code true}. This means that
-   * all further calls on the current object will interpret their arguments relative to the
-   * primary range.
-   *
-   * <p>If the split position has already been consumed, or if no {@link #tryReturnRecordAt} call
-   * was made yet, returns {@code false}. The second condition is to prevent dynamic splitting
-   * during reader start-up.
-   */
-  boolean trySplitAtPosition(PositionT splitPosition);
-
-  /**
-   * Returns the approximate fraction of positions in the source that have been consumed by
-   * successful {@link #tryReturnRecordAt} calls, or 0.0 if no such calls have happened.
-   */
-  double getFractionConsumed();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
deleted file mode 100644
index beb77bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Provides thread-safe helpers for implementing dynamic work rebalancing in position-based
- * bounded sources.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to get started.
- */
-package com.google.cloud.dataflow.sdk.io.range;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
deleted file mode 100644
index 60d62d3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that allow setting the application name.
- */
-public interface ApplicationNameOptions extends PipelineOptions {
-  /**
-   * Name of application, for display purposes.
-   *
-   * <p>Defaults to the name of the class that constructs the {@link PipelineOptions}
-   * via the {@link PipelineOptionsFactory}.
-   */
-  @Description("Name of application for display purposes. Defaults to the name of the class that "
-      + "constructs the PipelineOptions via the PipelineOptionsFactory.")
-  String getAppName();
-  void setAppName(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
deleted file mode 100644
index ed4eb24..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Properties needed when using BigQuery with the Dataflow SDK.
- */
-@Description("Options that are used to configure BigQuery. See "
-    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-  @Description("Temporary dataset for BigQuery table operations. "
-      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
-  @Default.String("bigquery.googleapis.com/cloud_dataflow")
-  String getTempDatasetId();
-  void setTempDatasetId(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 43a46b0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
-  /**
-   * Output stream for job status messages.
-   */
-  @Description("Where messages generated during execution of the Dataflow job will be output.")
-  @JsonIgnore
-  @Hidden
-  @Default.InstanceFactory(StandardOutputFactory.class)
-  PrintStream getJobMessageOutput();
-  void setJobMessageOutput(PrintStream value);
-
-  /**
-   * Returns a default of {@link System#out}.
-   */
-  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
-    @Override
-    public PrintStream create(PipelineOptions options) {
-      return System.out;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 2e1ad94..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
-  /**
-   * Whether to enable the Cloud Debugger snapshot agent for the current job.
-   */
-  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
-  boolean getEnableCloudDebugger();
-  void setEnableCloudDebugger(boolean enabled);
-
-  @Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
-  @Hidden
-  @Nullable Debuggee getDebuggee();
-  void setDebuggee(Debuggee debuggee);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index cadc011..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-import com.google.cloud.dataflow.sdk.util.Transport;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
-    + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
-  /**
-   * The list of backend experiments to enable.
-   *
-   * <p>Dataflow provides a number of experimental features that can be enabled
-   * with this flag.
-   *
-   * <p>Please sync with the Dataflow team before enabling any experiments.
-   */
-  @Description("[Experimental] Dataflow provides a number of experimental features that can "
-      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
-      + "experiments.")
-  @Experimental
-  List<String> getExperiments();
-  void setExperiments(List<String> value);
-
-  /**
-   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
-   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
-   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
-   */
-  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
-      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
-      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
-  @Default.String(Dataflow.DEFAULT_ROOT_URL)
-  String getApiRootUrl();
-  void setApiRootUrl(String value);
-
-  /**
-   * Dataflow endpoint to use.
-   *
-   * <p>Defaults to the current version of the Google Cloud Dataflow
-   * API, at the time the current SDK version was released.
-   *
-   * <p>If the string contains "://", then this is treated as a URL,
-   * otherwise {@link #getApiRootUrl()} is used as the root
-   * URL.
-   */
-  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
-      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
-  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
-  String getDataflowEndpoint();
-  void setDataflowEndpoint(String value);
-
-  /**
-   * The path to write the translated Dataflow job specification out to
-   * at job submission time. The Dataflow job specification will be represented in JSON
-   * format.
-   */
-  @Description("The path to write the translated Dataflow job specification out to "
-      + "at job submission time. The Dataflow job specification will be represented in JSON "
-      + "format.")
-  String getDataflowJobFile();
-  void setDataflowJobFile(String value);
-
-  /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths. "
-      + "If pathValidator has not been set explicitly, an instance of this class will be "
-      + "constructed and used as the path validator.")
-  @Default.Class(DataflowPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
-   * The class responsible for staging resources to be accessible by workers
-   * during job execution. If stager has not been set explicitly, an instance of this class
-   * will be created and used as the resource stager.
-   */
-  @Description("The class of the stager that should be created and used to stage resources. "
-      + "If stager has not been set explicitly, an instance of the this class will be created "
-      + "and used as the resource stager.")
-  @Default.Class(GcsStager.class)
-  Class<? extends Stager> getStagerClass();
-  void setStagerClass(Class<? extends Stager> stagerClass);
-
-  /**
-   * The resource stager instance that should be used to stage resources.
-   * If no stager has been set explicitly, the default is to use the instance factory
-   * that constructs a resource stager based upon the currently set stagerClass.
-   */
-  @JsonIgnore
-  @Description("The resource stager instance that should be used to stage resources. "
-      + "If no stager has been set explicitly, the default is to use the instance factory "
-      + "that constructs a resource stager based upon the currently set stagerClass.")
-  @Default.InstanceFactory(StagerFactory.class)
-  Stager getStager();
-  void setStager(Stager stager);
-
-  /**
-   * An instance of the Dataflow client. Defaults to creating a Dataflow client
-   * using the current set of options.
-   */
-  @JsonIgnore
-  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
-      + "using the current set of options.")
-  @Default.InstanceFactory(DataflowClientFactory.class)
-  Dataflow getDataflowClient();
-  void setDataflowClient(Dataflow value);
-
-  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
-  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
-    @Override
-    public Dataflow create(PipelineOptions options) {
-        return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build();
-    }
-  }
-
-  /**
-   * Root URL for use with the Pubsub API.
-   */
-  @Description("Root URL for use with the Pubsub API")
-  @Default.String("https://pubsub.googleapis.com")
-  String getPubsubRootUrl();
-  void setPubsubRootUrl(String value);
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   *
-   * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
-   */
-  @Deprecated
-  @Description("If set, replace the existing pipeline with the name specified by --jobName with "
-      + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Deprecated
-  void setUpdate(boolean value);
-
-  /**
-   * Mapping of old PTranform names to new ones, specified as JSON
-   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
-   * empty string.
-   */
-  @JsonIgnore
-  @Description(
-      "Mapping of old PTranform names to new ones, specified as JSON "
-      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
-      + "string.")
-  Map<String, String> getTransformNameMapping();
-  void setTransformNameMapping(Map<String, String> value);
-
-  /**
-   * Custom windmill_main binary to use with the streaming runner.
-   */
-  @Description("Custom windmill_main binary to use with the streaming runner")
-  String getOverrideWindmillBinary();
-  void setOverrideWindmillBinary(String value);
-
-  /**
-   * Number of threads to use on the Dataflow worker harness. If left unspecified,
-   * the Dataflow service will compute an appropriate number of threads to use.
-   */
-  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
-      + "the Dataflow service will compute an appropriate number of threads to use.")
-  int getNumberOfWorkerHarnessThreads();
-  void setNumberOfWorkerHarnessThreads(int value);
-
-  /**
-   * If {@literal true}, save a heap dump before killing a thread or process which is GC
-   * thrashing or out of memory. The location of the heap file will either be echoed back
-   * to the user, or the user will be given the opportunity to download the heap file.
-   *
-   * <p>
-   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
-   * the boot disk size before setting this flag to true.
-   */
-  @Description("If {@literal true}, save a heap dump before killing a thread or process "
-      + "which is GC thrashing or out of memory.")
-  boolean getDumpHeapOnOOM();
-  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
-  /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
-      @Override
-      public PathValidator create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(debugOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-
-  /**
-   * Creates a {@link Stager} object using the class specified in
-   * {@link #getStagerClass()}.
-   */
-  public static class StagerFactory implements DefaultValueFactory<Stager> {
-      @Override
-      public Stager create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(Stager.class)
-          .fromClass(debugOptions.getStagerClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index 1aa4342..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
-    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
-    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
-    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
-    DataflowProfilingOptions {
-
-  static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location";
-
-  @Description("Project id. Required when running a Dataflow in the cloud. "
-      + "See https://cloud.google.com/storage/docs/projects for further details.")
-  @Override
-  @Validation.Required
-  @Default.InstanceFactory(DefaultProjectFactory.class)
-  String getProject();
-  @Override
-  void setProject(String value);
-
-  /**
-   * GCS path for temporary files, e.g. gs://bucket/object
-   *
-   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
-   *
-   * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
-   * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
-   * {@link #getStagingLocation()}.
-   */
-  @Description("GCS path for temporary files, eg \"gs://bucket/object\". "
-      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of tempLocation or stagingLocation must be set. If tempLocation is unset, "
-      + "defaults to using stagingLocation.")
-  @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
-  String getTempLocation();
-  void setTempLocation(String value);
-
-  /**
-   * GCS path for staging local files, e.g. gs://bucket/object
-   *
-   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
-   *
-   * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
-   * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
-   * {@link #getStagingLocation()}.
-   */
-  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
-      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
-      + "defaults to using tempLocation.")
-  @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
-  String getStagingLocation();
-  void setStagingLocation(String value);
-
-  /**
-   * The Dataflow job name is used as an idempotence key within the Dataflow service.
-   * If there is an existing job that is currently active, another active job with the same
-   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
-   */
-  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
-      + "If there is an existing job that is currently active, another active job with the same "
-      + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
-  @Default.InstanceFactory(JobNameFactory.class)
-  String getJobName();
-  void setJobName(String value);
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   */
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  @Description(
-      "If set, replace the existing pipeline with the name specified by --jobName with "
-          + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  void setUpdate(boolean value);
-
-  /**
-   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
-   * local system user name (if available), and the current time. The normalization makes sure that
-   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
-   * characters.
-   *
-   * <p>This job name factory is only able to generate one unique name per second per application
-   * and user combination.
-   */
-  public static class JobNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
-          : appName.toLowerCase()
-                   .replaceAll("[^a-z0-9]", "0")
-                   .replaceAll("^[^a-z]", "a");
-      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
-      String normalizedUserName = userName.toLowerCase()
-                                          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
-    }
-  }
-}