You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:03 UTC
[39/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 5ab8582..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
- private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
- public BigtableServiceImpl(BigtableOptions options) {
- this.options = options;
- }
-
- private final BigtableOptions options;
-
- @Override
- public BigtableWriterImpl openForWriting(String tableId) throws IOException {
- BigtableSession session = new BigtableSession(options);
- String tableName = options.getClusterName().toTableNameStr(tableId);
- return new BigtableWriterImpl(session, tableName);
- }
-
- @Override
- public boolean tableExists(String tableId) throws IOException {
- if (!BigtableSession.isAlpnProviderEnabled()) {
- logger.info(
- "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
- + " configured.",
- tableId,
- options);
- return true;
- }
-
- try (BigtableSession session = new BigtableSession(options)) {
- GetTableRequest getTable =
- GetTableRequest.newBuilder()
- .setName(options.getClusterName().toTableNameStr(tableId))
- .build();
- session.getTableAdminClient().getTable(getTable);
- return true;
- } catch (StatusRuntimeException e) {
- if (e.getStatus().getCode() == Code.NOT_FOUND) {
- return false;
- }
- String message =
- String.format(
- "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
- logger.error(message, e);
- throw new IOException(message, e);
- }
- }
-
- private class BigtableReaderImpl implements Reader {
- private BigtableSession session;
- private final BigtableSource source;
- private ResultScanner<Row> results;
- private Row currentRow;
-
- public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
- this.session = session;
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- RowRange range =
- RowRange.newBuilder()
- .setStartKey(source.getRange().getStartKey().getValue())
- .setEndKey(source.getRange().getEndKey().getValue())
- .build();
- ReadRowsRequest.Builder requestB =
- ReadRowsRequest.newBuilder()
- .setRowRange(range)
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
- }
- results = session.getDataClient().readRows(requestB.build());
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return (currentRow != null);
- }
-
- @Override
- public void close() throws IOException {
- // Goal: by the end of this function, both results and session are null and closed,
- // independent of what errors they throw or prior state.
-
- if (session == null) {
- // Only possible when previously closed, so we know that results is also null.
- return;
- }
-
- // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
- // the Closer, but we can use the Closer to simplify the error handling.
- try (Closer closer = Closer.create()) {
- if (results != null) {
- closer.register(results);
- results = null;
- }
-
- session.close();
- } finally {
- session = null;
- }
- }
-
- @Override
- public Row getCurrentRow() throws NoSuchElementException {
- if (currentRow == null) {
- throw new NoSuchElementException();
- }
- return currentRow;
- }
- }
-
- private static class BigtableWriterImpl implements Writer {
- private BigtableSession session;
- private AsyncExecutor executor;
- private final MutateRowRequest.Builder partialBuilder;
-
- public BigtableWriterImpl(BigtableSession session, String tableName) {
- this.session = session;
- this.executor =
- new AsyncExecutor(
- session.getDataClient(),
- new HeapSizeManager(
- AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
- AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
- partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (executor != null) {
- executor.flush();
- executor = null;
- }
- } finally {
- if (session != null) {
- session.close();
- session = null;
- }
- }
- }
-
- @Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException {
- MutateRowRequest r =
- partialBuilder
- .clone()
- .setRowKey(record.getKey())
- .addAllMutations(record.getValue())
- .build();
- try {
- return executor.mutateRowAsync(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Write interrupted", e);
- }
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects
- .toStringHelper(BigtableServiceImpl.class)
- .add("options", options)
- .toString();
- }
-
- @Override
- public Reader createReader(BigtableSource source) throws IOException {
- BigtableSession session = new BigtableSession(options);
- return new BigtableReaderImpl(session, source);
- }
-
- @Override
- public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
- try (BigtableSession session = new BigtableSession(options)) {
- SampleRowKeysRequest request =
- SampleRowKeysRequest.newBuilder()
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
- .build();
- return session.getDataClient().sampleRowKeys(request);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
deleted file mode 100644
index 112a954..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
deleted file mode 100644
index de0bd86..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines transforms for reading and writing common storage formats, including
- * {@link com.google.cloud.dataflow.sdk.io.AvroIO},
- * {@link com.google.cloud.dataflow.sdk.io.BigQueryIO}, and
- * {@link com.google.cloud.dataflow.sdk.io.TextIO}.
- *
- * <p>The classes in this package provide {@code Read} transforms that create PCollections
- * from existing storage:
- * <pre>{@code
- * PCollection<TableRow> inputData = pipeline.apply(
- * BigQueryIO.Read.named("Read")
- * .from("clouddataflow-readonly:samples.weather_stations");
- * }</pre>
- * and {@code Write} transforms that persist PCollections to external storage:
- * <pre> {@code
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- * .to("gs://my_bucket/path/to/numbers"));
- * } </pre>
- */
-package com.google.cloud.dataflow.sdk.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
deleted file mode 100644
index 30772da..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ByteString.ByteIterator;
-
-import java.io.Serializable;
-
-/**
- * A class representing a key consisting of an array of bytes. Arbitrary-length
- * {@code byte[]} keys are typical in key-value stores such as Google Cloud Bigtable.
- *
- * <p>Instances of {@link ByteKey} are immutable.
- *
- * <p>{@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the successor
- * to a key is the same key with an additional 0 byte appended; and keys have unbounded size.
- *
- * <p>Note that the empty {@link ByteKey} compares smaller than all other keys, but some systems
- * have the semantic that when an empty {@link ByteKey} is used as an upper bound, it represents
- * the largest possible key. In these cases, implementors should use {@link #isEmpty} to test
- * whether an upper bound key is empty.
- */
-public final class ByteKey implements Comparable<ByteKey>, Serializable {
- /** An empty key. */
- public static final ByteKey EMPTY = ByteKey.of();
-
- /**
- * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
- */
- public static ByteKey of(ByteString value) {
- return new ByteKey(value);
- }
-
- /**
- * Creates a new {@link ByteKey} backed by a copy of the specified {@code byte[]}.
- *
- * <p>Makes a copy of the underlying array.
- */
- public static ByteKey copyFrom(byte[] bytes) {
- return of(ByteString.copyFrom(bytes));
- }
-
- /**
- * Creates a new {@link ByteKey} backed by a copy of the specified {@code int[]}. This method is
- * primarily used as a convenience to create a {@link ByteKey} in code without casting down to
- * signed Java {@link Byte bytes}:
- *
- * <pre>{@code
- * ByteKey key = ByteKey.of(0xde, 0xad, 0xbe, 0xef);
- * }</pre>
- *
- * <p>Makes a copy of the input.
- */
- public static ByteKey of(int... bytes) {
- byte[] ret = new byte[bytes.length];
- for (int i = 0; i < bytes.length; ++i) {
- ret[i] = (byte) (bytes[i] & 0xff);
- }
- return ByteKey.copyFrom(ret);
- }
-
- /**
- * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
- *
- * <p>Does not copy.
- */
- public ByteString getValue() {
- return value;
- }
-
- /**
- * Returns a newly-allocated {@code byte[]} representing this {@link ByteKey}.
- *
- * <p>Copies the underlying {@code byte[]}.
- */
- public byte[] getBytes() {
- return value.toByteArray();
- }
-
- /**
- * Returns {@code true} if the {@code byte[]} backing this {@link ByteKey} is of length 0.
- */
- public boolean isEmpty() {
- return value.isEmpty();
- }
-
- /**
- * {@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the
- * successor to a key is the same key with an additional 0 byte appended; and keys have unbounded
- * size.
- */
- @Override
- public int compareTo(ByteKey other) {
- checkNotNull(other, "other");
- ByteIterator thisIt = value.iterator();
- ByteIterator otherIt = other.value.iterator();
- while (thisIt.hasNext() && otherIt.hasNext()) {
- // (byte & 0xff) converts [-128,127] bytes to [0,255] ints.
- int cmp = (thisIt.nextByte() & 0xff) - (otherIt.nextByte() & 0xff);
- if (cmp != 0) {
- return cmp;
- }
- }
- // If we get here, the prefix of both arrays is equal up to the shorter array. The array with
- // more bytes is larger.
- return value.size() - other.value.size();
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- private final ByteString value;
-
- private ByteKey(ByteString value) {
- this.value = value;
- }
-
- /** Array used as a helper in {@link #toString}. */
- private static final char[] HEX =
- new char[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
-
- // Prints the key as a string "[deadbeef]".
- @Override
- public String toString() {
- char[] encoded = new char[2 * value.size() + 2];
- encoded[0] = '[';
- int cnt = 1;
- ByteIterator iterator = value.iterator();
- while (iterator.hasNext()) {
- byte b = iterator.nextByte();
- encoded[cnt] = HEX[(b & 0xF0) >>> 4];
- ++cnt;
- encoded[cnt] = HEX[b & 0xF];
- ++cnt;
- }
- encoded[cnt] = ']';
- return new String(encoded);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof ByteKey)) {
- return false;
- }
- ByteKey other = (ByteKey) o;
- return (other.value.size() == value.size()) && this.compareTo(other) == 0;
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
deleted file mode 100644
index 6f58d39..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A class representing a range of {@link ByteKey ByteKeys}.
- *
- * <p>Instances of {@link ByteKeyRange} are immutable.
- *
- * <p>A {@link ByteKeyRange} enforces the restriction that its start and end keys must form a valid,
- * non-empty range {@code [startKey, endKey)} that is inclusive of the start key and exclusive of
- * the end key.
- *
- * <p>When the end key is empty, it is treated as the largest possible key.
- *
- * <h3>Interpreting {@link ByteKey} in a {@link ByteKeyRange}</h3>
- *
- * <p>The primary role of {@link ByteKeyRange} is to provide functionality for
- * {@link #estimateFractionForKey(ByteKey)}, {@link #interpolateKey(double)}, and
- * {@link #split(int)}, which are used for Google Cloud Dataflow's
- * <a href="https://cloud.google.com/dataflow/service/dataflow-service-desc#AutoScaling">Autoscaling
- * and Dynamic Work Rebalancing</a> features.
- *
- * <p>{@link ByteKeyRange} implements these features by treating a {@link ByteKey}'s underlying
- * {@code byte[]} as the binary expansion of floating point numbers in the range {@code [0.0, 1.0]}.
- * For example, the keys {@code ByteKey.of(0x80)}, {@code ByteKey.of(0xc0)}, and
- * {@code ByteKey.of(0xe0)} are interpreted as {@code 0.5}, {@code 0.75}, and {@code 0.875}
- * respectively. The empty {@code ByteKey.EMPTY} is interpreted as {@code 0.0} when used as the
- * start of a range and {@code 1.0} when used as the end key.
- *
- * <p>Key interpolation, fraction estimation, and range splitting are all interpreted in these
- * floating-point semantics. See the respective implementations for further details. <b>Note:</b>
- * the underlying implementations of these functions use {@link BigInteger} and {@link BigDecimal},
- * so they can be slow and should not be called in hot loops. Dataflow's dynamic work
- * rebalancing will only invoke these functions during periodic control operations, so they are not
- * called on the critical path.
- *
- * @see ByteKey
- */
-public final class ByteKeyRange implements Serializable {
- private static final Logger logger = LoggerFactory.getLogger(ByteKeyRange.class);
-
- /** The range of all keys, with empty start and end keys. */
- public static final ByteKeyRange ALL_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.EMPTY);
-
- /**
- * Creates a new {@link ByteKeyRange} with the given start and end keys.
- *
- * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
- *
- * @see ByteKeyRange
- *
- * @throws IllegalArgumentException if {@code endKey} is less than or equal to {@code startKey},
- * unless {@code endKey} is empty indicating the maximum possible {@link ByteKey}.
- */
- public static ByteKeyRange of(ByteKey startKey, ByteKey endKey) {
- return new ByteKeyRange(startKey, endKey);
- }
-
- /**
- * Returns the {@link ByteKey} representing the lower bound of this {@link ByteKeyRange}.
- */
- public ByteKey getStartKey() {
- return startKey;
- }
-
- /**
- * Returns the {@link ByteKey} representing the upper bound of this {@link ByteKeyRange}.
- *
- * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
- */
- public ByteKey getEndKey() {
- return endKey;
- }
-
- /**
- * Returns {@code true} if the specified {@link ByteKey} is contained within this range.
- */
- public Boolean containsKey(ByteKey key) {
- return key.compareTo(startKey) >= 0 && endsAfterKey(key);
- }
-
- /**
- * Returns {@code true} if the specified {@link ByteKeyRange} overlaps this range.
- */
- public Boolean overlaps(ByteKeyRange other) {
- // If each range starts before the other range ends, then they must overlap.
- // { [] } -- one range inside the other OR { [ } ] -- partial overlap.
- return endsAfterKey(other.startKey) && other.endsAfterKey(startKey);
- }
-
- /**
- * Returns a list of up to {@code numSplits + 1} {@link ByteKey ByteKeys} in ascending order,
- * where the keys have been interpolated to form roughly equal sub-ranges of this
- * {@link ByteKeyRange}, assuming a uniform distribution of keys within this range.
- *
- * <p>The first {@link ByteKey} in the result is guaranteed to be equal to {@link #getStartKey},
- * and the last {@link ByteKey} in the result is guaranteed to be equal to {@link #getEndKey}.
- * Thus the resulting list exactly spans the same key range as this {@link ByteKeyRange}.
- *
- * <p>Note that the number of keys returned is not always equal to {@code numSplits + 1}.
- * Specifically, if this range is unsplittable (e.g., because the start and end keys are equal
- * up to padding by zero bytes), the list returned will only contain the start and end key.
- *
- * @throws IllegalArgumentException if the specified number of splits is < 1
- * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about split semantics.
- */
- public List<ByteKey> split(int numSplits) {
- checkArgument(numSplits > 0, "numSplits %s must be a positive integer", numSplits);
-
- try {
- ImmutableList.Builder<ByteKey> ret = ImmutableList.builder();
- ret.add(startKey);
- for (int i = 1; i < numSplits; ++i) {
- ret.add(interpolateKey(i / (double) numSplits));
- }
- ret.add(endKey);
- return ret.build();
- } catch (IllegalStateException e) {
- // The range is not splittable -- just return
- return ImmutableList.of(startKey, endKey);
- }
- }
-
- /**
- * Returns the fraction of this range {@code [startKey, endKey)} that is in the interval
- * {@code [startKey, key)}.
- *
- * @throws IllegalArgumentException if {@code key} does not fall within this range
- * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
- */
- public double estimateFractionForKey(ByteKey key) {
- checkNotNull(key, "key");
- checkArgument(!key.isEmpty(), "Cannot compute fraction for an empty key");
- checkArgument(
- key.compareTo(startKey) >= 0, "Expected key %s >= range start key %s", key, startKey);
-
- if (key.equals(endKey)) {
- return 1.0;
- }
- checkArgument(containsKey(key), "Cannot compute fraction for %s outside this %s", key, this);
-
- byte[] startBytes = startKey.getBytes();
- byte[] endBytes = endKey.getBytes();
- byte[] keyBytes = key.getBytes();
- // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
- // keys, to get a concrete least upper bound for the desired range.
- if (endKey.isEmpty()) {
- startBytes = addHeadByte(startBytes, (byte) 0);
- endBytes = addHeadByte(endBytes, (byte) 1);
- keyBytes = addHeadByte(keyBytes, (byte) 0);
- }
-
- // Pad to the longest of all 3 keys.
- int paddedKeyLength = Math.max(Math.max(startBytes.length, endBytes.length), keyBytes.length);
- BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
- BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
- BigInteger keyInt = paddedPositiveInt(keyBytes, paddedKeyLength);
-
- // Keys are equal subject to padding by 0.
- BigInteger range = rangeEndInt.subtract(rangeStartInt);
- if (range.equals(BigInteger.ZERO)) {
- logger.warn(
- "Using 0.0 as the default fraction for this near-empty range {} where start and end keys"
- + " differ only by trailing zeros.",
- this);
- return 0.0;
- }
-
- // Compute the progress (key-start)/(end-start) scaling by 2^64, dividing (which rounds),
- // and then scaling down after the division. This gives ample precision when converted to
- // double.
- BigInteger progressScaled = keyInt.subtract(rangeStartInt).shiftLeft(64);
- return progressScaled.divide(range).doubleValue() / Math.pow(2, 64);
- }
-
- /**
- * Returns a {@link ByteKey} {@code key} such that {@code [startKey, key)} represents
- * approximately the specified fraction of the range {@code [startKey, endKey)}. The interpolation
- * is computed assuming a uniform distribution of keys.
- *
- * <p>For example, given the largest possible range (defined by empty start and end keys), the
- * fraction {@code 0.5} will return the {@code ByteKey.of(0x80)}, which will also be returned for
- * ranges {@code [0x40, 0xc0)} and {@code [0x6f, 0x91)}.
- *
- * <p>The key returned will never be empty.
- *
- * @throws IllegalArgumentException if {@code fraction} is outside the range [0, 1)
- * @throws IllegalStateException if this range cannot be interpolated
- * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
- */
- public ByteKey interpolateKey(double fraction) {
- checkArgument(
- fraction >= 0.0 && fraction < 1.0, "Fraction %s must be in the range [0, 1)", fraction);
- byte[] startBytes = startKey.getBytes();
- byte[] endBytes = endKey.getBytes();
- // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
- // keys, to get a concrete least upper bound for the desired range.
- if (endKey.isEmpty()) {
- startBytes = addHeadByte(startBytes, (byte) 0);
- endBytes = addHeadByte(endBytes, (byte) 1);
- }
-
- // Pad to the longest key.
- int paddedKeyLength = Math.max(startBytes.length, endBytes.length);
- BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
- BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
-
- // If the keys are equal subject to padding by 0, we can't interpolate.
- BigInteger range = rangeEndInt.subtract(rangeStartInt);
- checkState(
- !range.equals(BigInteger.ZERO),
- "Refusing to interpolate for near-empty %s where start and end keys differ only by trailing"
- + " zero bytes.",
- this);
-
- // Add precision so that range is at least 53 (double mantissa length) bits long. This way, we
- // can interpolate small ranges finely, e.g., split the range key 3 to key 4 into 1024 parts.
- // We add precision to range by adding zero bytes to the end of the keys, aka shifting the
- // underlying BigInteger left by a multiple of 8 bits.
- int bytesNeeded = ((53 - range.bitLength()) + 7) / 8;
- if (bytesNeeded > 0) {
- range = range.shiftLeft(bytesNeeded * 8);
- rangeStartInt = rangeStartInt.shiftLeft(bytesNeeded * 8);
- paddedKeyLength += bytesNeeded;
- }
-
- BigInteger interpolatedOffset =
- new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger();
-
- int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength;
- return ByteKey.copyFrom(
- fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength));
- }
-
- /**
- * Returns new {@link ByteKeyRange} like this one, but with the specified start key.
- */
- public ByteKeyRange withStartKey(ByteKey startKey) {
- return new ByteKeyRange(startKey, endKey);
- }
-
- /**
- * Returns new {@link ByteKeyRange} like this one, but with the specified end key.
- */
- public ByteKeyRange withEndKey(ByteKey endKey) {
- return new ByteKeyRange(startKey, endKey);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- private final ByteKey startKey;
- private final ByteKey endKey;
-
- private ByteKeyRange(ByteKey startKey, ByteKey endKey) {
- this.startKey = checkNotNull(startKey, "startKey");
- this.endKey = checkNotNull(endKey, "endKey");
- checkArgument(endsAfterKey(startKey), "Start %s must be less than end %s", startKey, endKey);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(ByteKeyRange.class)
- .add("startKey", startKey)
- .add("endKey", endKey)
- .toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof ByteKeyRange)) {
- return false;
- }
- ByteKeyRange other = (ByteKeyRange) o;
- return Objects.equals(startKey, other.startKey) && Objects.equals(endKey, other.endKey);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startKey, endKey);
- }
-
- /**
- * Returns a copy of the specified array with the specified byte added at the front.
- */
- private static byte[] addHeadByte(byte[] array, byte b) {
- byte[] ret = new byte[array.length + 1];
- ret[0] = b;
- System.arraycopy(array, 0, ret, 1, array.length);
- return ret;
- }
-
- /**
- * Ensures the array is exactly {@code size} bytes long. Returns the input array if the condition
- * is met, otherwise either adds or removes zero bytes from the beginning of {@code array}.
- */
- private static byte[] fixupHeadZeros(byte[] array, int size) {
- int padding = size - array.length;
- if (padding == 0) {
- return array;
- }
-
- if (padding < 0) {
- // There is one zero byte at the beginning, added by BigInteger to make there be a sign
- // bit when converting to bytes.
- verify(
- padding == -1,
- "key %s: expected length %d with exactly one byte of padding, found %d",
- ByteKey.copyFrom(array),
- size,
- -padding);
- verify(
- (array[0] == 0) && ((array[1] & 0x80) == 0x80),
- "key %s: is 1 byte longer than expected, indicating BigInteger padding. Expect first byte"
- + " to be zero with set MSB in second byte.",
- ByteKey.copyFrom(array));
- return Arrays.copyOfRange(array, 1, array.length);
- }
-
- byte[] ret = new byte[size];
- System.arraycopy(array, 0, ret, padding, array.length);
- return ret;
- }
-
- /**
- * Returns {@code true} when the specified {@code key} is smaller this range's end key. The only
- * semantic change from {@code (key.compareTo(getEndKey()) < 0)} is that the empty end key is
- * treated as larger than all possible {@link ByteKey keys}.
- */
- boolean endsAfterKey(ByteKey key) {
- return endKey.isEmpty() || key.compareTo(endKey) < 0;
- }
-
- /** Builds a BigInteger out of the specified array, padded to the desired byte length. */
- private static BigInteger paddedPositiveInt(byte[] bytes, int length) {
- int bytePaddingNeeded = length - bytes.length;
- checkArgument(
- bytePaddingNeeded >= 0, "Required bytes.length {} < length {}", bytes.length, length);
- BigInteger ret = new BigInteger(1, bytes);
- return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
deleted file mode 100644
index f6796cc..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RangeTracker} for {@link ByteKey ByteKeys} in {@link ByteKeyRange ByteKeyRanges}.
- *
- * @see ByteKey
- * @see ByteKeyRange
- */
-public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
- private static final Logger logger = LoggerFactory.getLogger(ByteKeyRangeTracker.class);
-
- /** Instantiates a new {@link ByteKeyRangeTracker} with the specified range. */
- public static ByteKeyRangeTracker of(ByteKeyRange range) {
- return new ByteKeyRangeTracker(range);
- }
-
- @Override
- public synchronized ByteKey getStartPosition() {
- return range.getStartKey();
- }
-
- @Override
- public synchronized ByteKey getStopPosition() {
- return range.getEndKey();
- }
-
- @Override
- public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
- if (isAtSplitPoint && !range.containsKey(recordStart)) {
- return false;
- }
- position = recordStart;
- return true;
- }
-
- @Override
- public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
- // Unstarted.
- if (position == null) {
- logger.warn(
- "{}: Rejecting split request at {} because no records have been returned.",
- this,
- splitPosition);
- return false;
- }
-
- // Started, but not after current position.
- if (splitPosition.compareTo(position) <= 0) {
- logger.warn(
- "{}: Rejecting split request at {} because it is not after current position {}.",
- this,
- splitPosition,
- position);
- return false;
- }
-
- // Sanity check.
- if (!range.containsKey(splitPosition)) {
- logger.warn(
- "{}: Rejecting split request at {} because it is not within the range.",
- this,
- splitPosition);
- return false;
- }
-
- range = range.withEndKey(splitPosition);
- return true;
- }
-
- @Override
- public synchronized double getFractionConsumed() {
- if (position == null) {
- return 0;
- }
- return range.estimateFractionForKey(position);
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- private ByteKeyRange range;
- @Nullable private ByteKey position;
-
- private ByteKeyRangeTracker(ByteKeyRange range) {
- this.range = range;
- this.position = null;
- }
-
- @Override
- public String toString() {
- return toStringHelper(ByteKeyRangeTracker.class)
- .add("range", range)
- .add("position", position)
- .toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
deleted file mode 100644
index b237217..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link RangeTracker} for non-negative positions of type {@code long}.
- */
-public class OffsetRangeTracker implements RangeTracker<Long> {
- private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
-
- private final long startOffset;
- private long stopOffset;
- private long lastRecordStart = -1L;
- private long offsetOfLastSplitPoint = -1L;
-
- /**
- * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and
- * indicates reading all of the records until the end without specifying exactly what the end is.
- *
- * <p>Infinite ranges cannot be split because it is impossible to estimate progress within them.
- */
- public static final long OFFSET_INFINITY = Long.MAX_VALUE;
-
- /**
- * Creates an {@code OffsetRangeTracker} for the specified range.
- */
- public OffsetRangeTracker(long startOffset, long stopOffset) {
- this.startOffset = startOffset;
- this.stopOffset = stopOffset;
- }
-
- @Override
- public synchronized Long getStartPosition() {
- return startOffset;
- }
-
- @Override
- public synchronized Long getStopPosition() {
- return stopOffset;
- }
-
- @Override
- public boolean tryReturnRecordAt(boolean isAtSplitPoint, Long recordStart) {
- return tryReturnRecordAt(isAtSplitPoint, recordStart.longValue());
- }
-
- public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {
- if (lastRecordStart == -1 && !isAtSplitPoint) {
- throw new IllegalStateException(
- String.format("The first record [starting at %d] must be at a split point", recordStart));
- }
- if (recordStart < lastRecordStart) {
- throw new IllegalStateException(
- String.format(
- "Trying to return record [starting at %d] "
- + "which is before the last-returned record [starting at %d]",
- recordStart,
- lastRecordStart));
- }
- if (isAtSplitPoint) {
- if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) {
- throw new IllegalStateException(
- String.format(
- "Record at a split point has same offset as the previous split point: "
- + "previous split point at %d, current record starts at %d",
- offsetOfLastSplitPoint, recordStart));
- }
- if (recordStart >= stopOffset) {
- return false;
- }
- offsetOfLastSplitPoint = recordStart;
- }
-
- lastRecordStart = recordStart;
- return true;
- }
-
- @Override
- public boolean trySplitAtPosition(Long splitOffset) {
- return trySplitAtPosition(splitOffset.longValue());
- }
-
- public synchronized boolean trySplitAtPosition(long splitOffset) {
- if (stopOffset == OFFSET_INFINITY) {
- LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);
- return false;
- }
- if (lastRecordStart == -1) {
- LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);
- return false;
- }
-
- // Note: technically it is correct to split at any position after the last returned
- // split point, not just the last returned record.
- // TODO: Investigate whether in practice this is useful or, rather, confusing.
- if (splitOffset <= lastRecordStart) {
- LOG.debug(
- "Refusing to split {} at {}: already past proposed split position", this, splitOffset);
- return false;
- }
- if (splitOffset < startOffset || splitOffset >= stopOffset) {
- LOG.debug(
- "Refusing to split {} at {}: proposed split position out of range", this, splitOffset);
- return false;
- }
- LOG.debug("Agreeing to split {} at {}", this, splitOffset);
- this.stopOffset = splitOffset;
- return true;
- }
-
- /**
- * Returns a position {@code P} such that the range {@code [start, P)} represents approximately
- * the given fraction of the range {@code [start, end)}. Assumes that the density of records
- * in the range is approximately uniform.
- */
- public synchronized long getPositionForFractionConsumed(double fraction) {
- if (stopOffset == OFFSET_INFINITY) {
- throw new IllegalArgumentException(
- "getPositionForFractionConsumed is not applicable to an unbounded range: " + this);
- }
- return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset));
- }
-
- @Override
- public synchronized double getFractionConsumed() {
- if (stopOffset == OFFSET_INFINITY) {
- return 0.0;
- }
- if (lastRecordStart == -1) {
- return 0.0;
- }
- // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
- // which is (4 - 3 + 1) / (6 - 3) = 67%.
- // Also, clamp to at most 1.0 because the last consumed position can extend past the
- // stop position.
- return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
- }
-
- @Override
- public synchronized String toString() {
- String stopString = (stopOffset == OFFSET_INFINITY) ? "infinity" : String.valueOf(stopOffset);
- if (lastRecordStart >= 0) {
- return String.format(
- "<at [starting at %d] of offset range [%d, %s)>",
- lastRecordStart,
- startOffset,
- stopString);
- } else {
- return String.format("<unstarted in offset range [%d, %s)>", startOffset, stopString);
- }
- }
-
- /**
- * Returns a copy of this tracker for testing purposes (to simplify testing methods with
- * side effects).
- */
- @VisibleForTesting
- OffsetRangeTracker copy() {
- OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset);
- res.lastRecordStart = this.lastRecordStart;
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
deleted file mode 100644
index 84359f1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.io.range;
-
-/**
- * A {@code RangeTracker} is a thread-safe helper object for implementing dynamic work rebalancing
- * in position-based {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}
- * subclasses.
- *
- * <h3>Usage of the RangeTracker class hierarchy</h3>
- * The abstract {@code RangeTracker} interface should not be used per se - all users should use its
- * subclasses directly. We declare it here because all subclasses have roughly the same interface
- * and the same properties, to centralize the documentation. Currently we provide one
- * implementation - {@link OffsetRangeTracker}.
- *
- * <h3>Position-based sources</h3>
- * A position-based source is one where the source can be described by a range of positions of
- * an ordered type and the records returned by the reader can be described by positions of the
- * same type.
- *
- * <p>In case a record occupies a range of positions in the source, the most important thing about
- * the record is the position where it starts.
- *
- * <p>Defining the semantics of positions for a source is entirely up to the source class, however
- * the chosen definitions have to obey certain properties in order to make it possible to correctly
- * split the source into parts, including dynamic splitting. Two main aspects need to be defined:
- * <ul>
- * <li>How to assign starting positions to records.
- * <li>Which records should be read by a source with a range {@code [A, B)}.
- * </ul>
- * Moreover, reading a range must be <i>efficient</i>, i.e., the performance of reading a range
- * should not significantly depend on the location of the range. For example, reading the range
- * {@code [A, B)} should not require reading all data before {@code A}.
- *
- * <p>The sections below explain exactly what properties these definitions must satisfy, and
- * how to use a {@code RangeTracker} with a properly defined source.
- *
- * <h3>Properties of position-based sources</h3>
- * The main requirement for position-based sources is <i>associativity</i>: reading records from
- * {@code [A, B)} and records from {@code [B, C)} should give the same records as reading from
- * {@code [A, C)}, where {@code A <= B <= C}. This property ensures that no matter how a range
- * of positions is split into arbitrarily many sub-ranges, the total set of records described by
- * them stays the same.
- *
- * <p>The other important property is how the source's range relates to positions of records in
- * the source. In many sources each record can be identified by a unique starting position.
- * In this case:
- * <ul>
- * <li>All records returned by a source {@code [A, B)} must have starting positions
- * in this range.
- * <li>All but the last record should end within this range. The last record may or may not
- * extend past the end of the range.
- * <li>Records should not overlap.
- * </ul>
- * Such sources should define "read {@code [A, B)}" as "read from the first record starting at or
- * after A, up to but not including the first record starting at or after B".
- *
- * <p>Some examples of such sources include reading lines or CSV from a text file, reading keys and
- * values from a BigTable, etc.
- *
- * <p>The concept of <i>split points</i> allows to extend the definitions for dealing with sources
- * where some records cannot be identified by a unique starting position.
- *
- * <p>In all cases, all records returned by a source {@code [A, B)} must <i>start</i> at or after
- * {@code A}.
- *
- * <h3>Split points</h3>
- *
- * <p>Some sources may have records that are not directly addressable. For example, imagine a file
- * format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but
- * records within the block cannot be directly addressed without decompressing the block. Let us
- * refer to this hypothetical format as <i>CBF (Compressed Blocks Format)</i>.
- *
- * <p>Many such formats can still satisfy the associativity property. For example, in CBF, reading
- * {@code [A, B)} can mean "read all the records in all blocks whose starting offset is in
- * {@code [A, B)}".
- *
- * <p>To support such complex formats, we introduce the notion of <i>split points</i>. We say that
- * a record is a split point if there exists a position {@code A} such that the record is the first
- * one to be returned when reading the range {@code [A, infinity)}. In CBF, the only split points
- * would be the first records in each block.
- *
- * <p>Split points allow us to define the meaning of a record's position and a source's range
- * in all cases:
- * <ul>
- * <li>For a record that is at a split point, its position is defined to be the largest
- * {@code A} such that reading a source with the range {@code [A, infinity)} returns this record;
- * <li>Positions of other records are only required to be non-decreasing;
- * <li>Reading the source {@code [A, B)} must return records starting from the first split point
- * at or after {@code A}, up to but not including the first split point at or after {@code B}.
- * In particular, this means that the first record returned by a source MUST always be
- * a split point.
- * <li>Positions of split points must be unique.
- * </ul>
- * As a result, for any decomposition of the full range of the source into position ranges, the
- * total set of records will be the full set of records in the source, and each record
- * will be read exactly once.
- *
- * <h3>Consumed positions</h3>
- * As the source is being read, and records read from it are being passed to the downstream
- * transforms in the pipeline, we say that positions in the source are being <i>consumed</i>.
- * When a reader has read a record (or promised to a caller that a record will be returned),
- * positions up to and including the record's start position are considered <i>consumed</i>.
- *
- * <p>Dynamic splitting can happen only at <i>unconsumed</i> positions. If the reader just
- * returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or
- * beyond, as otherwise that record could be read twice (by the current reader and by a reader
- * of the task starting at 43).
- *
- * <h3>Example</h3>
- * The following example uses an {@link OffsetRangeTracker} to support dynamically splitting
- * a source with integer positions (offsets).
- * <pre> {@code
- * class MyReader implements BoundedReader<Foo> {
- * private MySource currentSource;
- * private final OffsetRangeTracker tracker = new OffsetRangeTracker();
- * ...
- * MyReader(MySource source) {
- * this.currentSource = source;
- * this.tracker = new MyRangeTracker<>(source.getStartOffset(), source.getEndOffset())
- * }
- * ...
- * boolean start() {
- * ... (general logic for locating the first record) ...
- * if (!tracker.tryReturnRecordAt(true, recordStartOffset)) return false;
- * ... (any logic that depends on the record being returned, e.g. counting returned records)
- * return true;
- * }
- * boolean advance() {
- * ... (general logic for locating the next record) ...
- * if (!tracker.tryReturnRecordAt(isAtSplitPoint, recordStartOffset)) return false;
- * ... (any logic that depends on the record being returned, e.g. counting returned records)
- * return true;
- * }
- *
- * double getFractionConsumed() {
- * return tracker.getFractionConsumed();
- * }
- * }
- * } </pre>
- *
- * <h3>Usage with different models of iteration</h3>
- * When using this class to protect a
- * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}, follow the pattern
- * described above.
- *
- * <p>When using this class to protect iteration in the {@code hasNext()/next()}
- * model, consider the record consumed when {@code hasNext()} is about to return true, rather than
- * when {@code next()} is called, because {@code hasNext()} returning true is promising the caller
- * that {@code next()} will have an element to return - so {@link #trySplitAtPosition} must not
- * split the range in a way that would make the record promised by {@code hasNext()} belong to
- * a different range.
- *
- * <p>Also note that implementations of {@code hasNext()} need to ensure
- * that they call {@link #tryReturnRecordAt} only once even if {@code hasNext()} is called
- * repeatedly, due to the requirement on uniqueness of split point positions.
- *
- * @param <PositionT> Type of positions used by the source to define ranges and identify records.
- */
-public interface RangeTracker<PositionT> {
- /**
- * Returns the starting position of the current range, inclusive.
- */
- PositionT getStartPosition();
-
- /**
- * Returns the ending position of the current range, exclusive.
- */
- PositionT getStopPosition();
-
- /**
- * Atomically determines whether a record at the given position can be returned and updates
- * internal state. In particular:
- * <ul>
- * <li>If {@code isAtSplitPoint} is {@code true}, and {@code recordStart} is outside the current
- * range, returns {@code false};
- * <li>Otherwise, updates the last-consumed position to {@code recordStart} and returns
- * {@code true}.
- * </ul>
- * <p>This method MUST be called on all split point records. It may be called on every record.
- */
- boolean tryReturnRecordAt(boolean isAtSplitPoint, PositionT recordStart);
-
- /**
- * Atomically splits the current range [{@link #getStartPosition}, {@link #getStopPosition})
- * into a "primary" part [{@link #getStartPosition}, {@code splitPosition})
- * and a "residual" part [{@code splitPosition}, {@link #getStopPosition}), assuming the current
- * last-consumed position is within [{@link #getStartPosition}, splitPosition)
- * (i.e., {@code splitPosition} has not been consumed yet).
- *
- * <p>Updates the current range to be the primary and returns {@code true}. This means that
- * all further calls on the current object will interpret their arguments relative to the
- * primary range.
- *
- * <p>If the split position has already been consumed, or if no {@link #tryReturnRecordAt} call
- * was made yet, returns {@code false}. The second condition is to prevent dynamic splitting
- * during reader start-up.
- */
- boolean trySplitAtPosition(PositionT splitPosition);
-
- /**
- * Returns the approximate fraction of positions in the source that have been consumed by
- * successful {@link #tryReturnRecordAt} calls, or 0.0 if no such calls have happened.
- */
- double getFractionConsumed();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
deleted file mode 100644
index beb77bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Provides thread-safe helpers for implementing dynamic work rebalancing in position-based
- * bounded sources.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to get started.
- */
-package com.google.cloud.dataflow.sdk.io.range;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
deleted file mode 100644
index 60d62d3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that allow setting the application name.
- */
-public interface ApplicationNameOptions extends PipelineOptions {
- /**
- * Name of application, for display purposes.
- *
- * <p>Defaults to the name of the class that constructs the {@link PipelineOptions}
- * via the {@link PipelineOptionsFactory}.
- */
- @Description("Name of application for display purposes. Defaults to the name of the class that "
- + "constructs the PipelineOptions via the PipelineOptionsFactory.")
- String getAppName();
- void setAppName(String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
deleted file mode 100644
index ed4eb24..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Properties needed when using BigQuery with the Dataflow SDK.
- */
-@Description("Options that are used to configure BigQuery. See "
- + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
- @Description("Temporary dataset for BigQuery table operations. "
- + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
- @Default.String("bigquery.googleapis.com/cloud_dataflow")
- String getTempDatasetId();
- void setTempDatasetId(String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 43a46b0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
- /**
- * Output stream for job status messages.
- */
- @Description("Where messages generated during execution of the Dataflow job will be output.")
- @JsonIgnore
- @Hidden
- @Default.InstanceFactory(StandardOutputFactory.class)
- PrintStream getJobMessageOutput();
- void setJobMessageOutput(PrintStream value);
-
- /**
- * Returns a default of {@link System#out}.
- */
- public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
- @Override
- public PrintStream create(PipelineOptions options) {
- return System.out;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 2e1ad94..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
- /**
- * Whether to enable the Cloud Debugger snapshot agent for the current job.
- */
- @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
- boolean getEnableCloudDebugger();
- void setEnableCloudDebugger(boolean enabled);
-
- @Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
- @Hidden
- @Nullable Debuggee getDebuggee();
- void setDebuggee(Debuggee debuggee);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index cadc011..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-import com.google.cloud.dataflow.sdk.util.Transport;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
- + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
- /**
- * The list of backend experiments to enable.
- *
- * <p>Dataflow provides a number of experimental features that can be enabled
- * with this flag.
- *
- * <p>Please sync with the Dataflow team before enabling any experiments.
- */
- @Description("[Experimental] Dataflow provides a number of experimental features that can "
- + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
- + "experiments.")
- @Experimental
- List<String> getExperiments();
- void setExperiments(List<String> value);
-
- /**
- * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
- * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
- * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
- */
- @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
- + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
- + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
- @Default.String(Dataflow.DEFAULT_ROOT_URL)
- String getApiRootUrl();
- void setApiRootUrl(String value);
-
- /**
- * Dataflow endpoint to use.
- *
- * <p>Defaults to the current version of the Google Cloud Dataflow
- * API, at the time the current SDK version was released.
- *
- * <p>If the string contains "://", then this is treated as a URL,
- * otherwise {@link #getApiRootUrl()} is used as the root
- * URL.
- */
- @Description("The URL for the Dataflow API. If the string contains \"://\", this"
- + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
- @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
- String getDataflowEndpoint();
- void setDataflowEndpoint(String value);
-
- /**
- * The path to write the translated Dataflow job specification out to
- * at job submission time. The Dataflow job specification will be represented in JSON
- * format.
- */
- @Description("The path to write the translated Dataflow job specification out to "
- + "at job submission time. The Dataflow job specification will be represented in JSON "
- + "format.")
- String getDataflowJobFile();
- void setDataflowJobFile(String value);
-
- /**
- * The class of the validator that should be created and used to validate paths.
- * If pathValidator has not been set explicitly, an instance of this class will be
- * constructed and used as the path validator.
- */
- @Description("The class of the validator that should be created and used to validate paths. "
- + "If pathValidator has not been set explicitly, an instance of this class will be "
- + "constructed and used as the path validator.")
- @Default.Class(DataflowPathValidator.class)
- Class<? extends PathValidator> getPathValidatorClass();
- void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
- /**
- * The path validator instance that should be used to validate paths.
- * If no path validator has been set explicitly, the default is to use the instance factory that
- * constructs a path validator based upon the currently set pathValidatorClass.
- */
- @JsonIgnore
- @Description("The path validator instance that should be used to validate paths. "
- + "If no path validator has been set explicitly, the default is to use the instance factory "
- + "that constructs a path validator based upon the currently set pathValidatorClass.")
- @Default.InstanceFactory(PathValidatorFactory.class)
- PathValidator getPathValidator();
- void setPathValidator(PathValidator validator);
-
- /**
- * The class responsible for staging resources to be accessible by workers
- * during job execution. If stager has not been set explicitly, an instance of this class
- * will be created and used as the resource stager.
- */
- @Description("The class of the stager that should be created and used to stage resources. "
- + "If stager has not been set explicitly, an instance of the this class will be created "
- + "and used as the resource stager.")
- @Default.Class(GcsStager.class)
- Class<? extends Stager> getStagerClass();
- void setStagerClass(Class<? extends Stager> stagerClass);
-
- /**
- * The resource stager instance that should be used to stage resources.
- * If no stager has been set explicitly, the default is to use the instance factory
- * that constructs a resource stager based upon the currently set stagerClass.
- */
- @JsonIgnore
- @Description("The resource stager instance that should be used to stage resources. "
- + "If no stager has been set explicitly, the default is to use the instance factory "
- + "that constructs a resource stager based upon the currently set stagerClass.")
- @Default.InstanceFactory(StagerFactory.class)
- Stager getStager();
- void setStager(Stager stager);
-
- /**
- * An instance of the Dataflow client. Defaults to creating a Dataflow client
- * using the current set of options.
- */
- @JsonIgnore
- @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
- + "using the current set of options.")
- @Default.InstanceFactory(DataflowClientFactory.class)
- Dataflow getDataflowClient();
- void setDataflowClient(Dataflow value);
-
- /** Returns the default Dataflow client built from the passed in PipelineOptions. */
- public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
- @Override
- public Dataflow create(PipelineOptions options) {
- return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build();
- }
- }
-
- /**
- * Root URL for use with the Pubsub API.
- */
- @Description("Root URL for use with the Pubsub API")
- @Default.String("https://pubsub.googleapis.com")
- String getPubsubRootUrl();
- void setPubsubRootUrl(String value);
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- *
- * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
- */
- @Deprecated
- @Description("If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Deprecated
- void setUpdate(boolean value);
-
- /**
- * Mapping of old PTranform names to new ones, specified as JSON
- * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
- * empty string.
- */
- @JsonIgnore
- @Description(
- "Mapping of old PTranform names to new ones, specified as JSON "
- + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
- + "string.")
- Map<String, String> getTransformNameMapping();
- void setTransformNameMapping(Map<String, String> value);
-
- /**
- * Custom windmill_main binary to use with the streaming runner.
- */
- @Description("Custom windmill_main binary to use with the streaming runner")
- String getOverrideWindmillBinary();
- void setOverrideWindmillBinary(String value);
-
- /**
- * Number of threads to use on the Dataflow worker harness. If left unspecified,
- * the Dataflow service will compute an appropriate number of threads to use.
- */
- @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
- + "the Dataflow service will compute an appropriate number of threads to use.")
- int getNumberOfWorkerHarnessThreads();
- void setNumberOfWorkerHarnessThreads(int value);
-
- /**
- * If {@literal true}, save a heap dump before killing a thread or process which is GC
- * thrashing or out of memory. The location of the heap file will either be echoed back
- * to the user, or the user will be given the opportunity to download the heap file.
- *
- * <p>
- * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
- * the boot disk size before setting this flag to true.
- */
- @Description("If {@literal true}, save a heap dump before killing a thread or process "
- + "which is GC thrashing or out of memory.")
- boolean getDumpHeapOnOOM();
- void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
- /**
- * Creates a {@link PathValidator} object using the class specified in
- * {@link #getPathValidatorClass()}.
- */
- public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
- @Override
- public PathValidator create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(PathValidator.class)
- .fromClass(debugOptions.getPathValidatorClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-
- /**
- * Creates a {@link Stager} object using the class specified in
- * {@link #getStagerClass()}.
- */
- public static class StagerFactory implements DefaultValueFactory<Stager> {
- @Override
- public Stager create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(Stager.class)
- .fromClass(debugOptions.getStagerClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index 1aa4342..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
- PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
- DataflowPipelineWorkerPoolOptions, BigQueryOptions,
- GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
- DataflowProfilingOptions {
-
- static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location";
-
- @Description("Project id. Required when running a Dataflow in the cloud. "
- + "See https://cloud.google.com/storage/docs/projects for further details.")
- @Override
- @Validation.Required
- @Default.InstanceFactory(DefaultProjectFactory.class)
- String getProject();
- @Override
- void setProject(String value);
-
- /**
- * GCS path for temporary files, e.g. gs://bucket/object
- *
- * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
- *
- * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
- * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
- * {@link #getStagingLocation()}.
- */
- @Description("GCS path for temporary files, eg \"gs://bucket/object\". "
- + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
- + "At least one of tempLocation or stagingLocation must be set. If tempLocation is unset, "
- + "defaults to using stagingLocation.")
- @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
- String getTempLocation();
- void setTempLocation(String value);
-
- /**
- * GCS path for staging local files, e.g. gs://bucket/object
- *
- * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
- *
- * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
- * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
- * {@link #getStagingLocation()}.
- */
- @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
- + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
- + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
- + "defaults to using tempLocation.")
- @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
- String getStagingLocation();
- void setStagingLocation(String value);
-
- /**
- * The Dataflow job name is used as an idempotence key within the Dataflow service.
- * If there is an existing job that is currently active, another active job with the same
- * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
- */
- @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
- + "If there is an existing job that is currently active, another active job with the same "
- + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- */
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- @Description(
- "If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- void setUpdate(boolean value);
-
- /**
- * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
- * local system user name (if available), and the current time. The normalization makes sure that
- * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
- * characters.
- *
- * <p>This job name factory is only able to generate one unique name per second per application
- * and user combination.
- */
- public static class JobNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
- return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
- }
- }
-}