You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:02 UTC
[07/10] incubator-beam git commit: BigQueryIO: move to
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
index df2308d..16fc6fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
@@ -17,34 +17,15 @@
*/
package org.apache.beam.sdk.util;
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
/**
* A set of utilities for working with Avro files.
@@ -154,192 +135,4 @@ public class AvroUtils {
}
return new AvroMetadata(syncMarker, codec, schemaString);
}
-
- /**
- * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
- * immutable.
- */
- private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
- DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
- // Package private for BigQueryTableRowIterator to use.
- static String formatTimestamp(String timestamp) {
- // timestamp is in "seconds since epoch" format, with scientific notation.
- // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
- // Separate into seconds and microseconds.
- double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
- long timestampMicros = (long) timestampDoubleMicros;
- long seconds = timestampMicros / 1000000;
- int micros = (int) (timestampMicros % 1000000);
- String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
-
- // No sub-second component.
- if (micros == 0) {
- return String.format("%s UTC", dayAndTime);
- }
-
- // Sub-second component.
- int digits = 6;
- int subsecond = micros;
- while (subsecond % 10 == 0) {
- digits--;
- subsecond /= 10;
- }
- String formatString = String.format("%%0%dd", digits);
- String fractionalSeconds = String.format(formatString, subsecond);
- return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
- }
-
- /**
- * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
- *
- * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
- * "Avro format"</a> for more information.
- */
- public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
- return convertGenericRecordToTableRow(record, schema.getFields());
- }
-
- private static TableRow convertGenericRecordToTableRow(
- GenericRecord record, List<TableFieldSchema> fields) {
- TableRow row = new TableRow();
- for (TableFieldSchema subSchema : fields) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
- // is required, so it may not be null.
- Field field = record.getSchema().getField(subSchema.getName());
- Object convertedValue =
- getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
- if (convertedValue != null) {
- // To match the JSON files exported by BigQuery, do not include null values in the output.
- row.set(field.name(), convertedValue);
- }
- }
- return row;
- }
-
- @Nullable
- private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
- // is optional (and so it may be null), but defaults to "NULLABLE".
- String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
- switch (mode) {
- case "REQUIRED":
- return convertRequiredField(schema.getType(), fieldSchema, v);
- case "REPEATED":
- return convertRepeatedField(schema, fieldSchema, v);
- case "NULLABLE":
- return convertNullableField(schema, fieldSchema, v);
- default:
- throw new UnsupportedOperationException(
- "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
- }
- }
-
- private static List<Object> convertRepeatedField(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- Type arrayType = schema.getType();
- verify(
- arrayType == Type.ARRAY,
- "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
- fieldSchema.getName(),
- arrayType);
- // REPEATED fields are represented as Avro arrays.
- if (v == null) {
- // Handle the case of an empty repeated field.
- return ImmutableList.of();
- }
- @SuppressWarnings("unchecked")
- List<Object> elements = (List<Object>) v;
- ImmutableList.Builder<Object> values = ImmutableList.builder();
- Type elementType = schema.getElementType().getType();
- for (Object element : elements) {
- values.add(convertRequiredField(elementType, fieldSchema, element));
- }
- return values.build();
- }
-
- private static Object convertRequiredField(
- Type avroType, TableFieldSchema fieldSchema, Object v) {
- // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
- // INTEGER type maps to an Avro LONG type.
- checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
- ImmutableMap<String, Type> fieldMap =
- ImmutableMap.<String, Type>builder()
- .put("STRING", Type.STRING)
- .put("INTEGER", Type.LONG)
- .put("FLOAT", Type.DOUBLE)
- .put("BOOLEAN", Type.BOOLEAN)
- .put("TIMESTAMP", Type.LONG)
- .put("RECORD", Type.RECORD)
- .build();
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
- // is required, so it may not be null.
- String bqType = fieldSchema.getType();
- Type expectedAvroType = fieldMap.get(bqType);
- verify(
- avroType == expectedAvroType,
- "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
- expectedAvroType,
- avroType,
- bqType,
- fieldSchema.getName());
- switch (fieldSchema.getType()) {
- case "STRING":
- // Avro will use a CharSequence to represent String objects, but it may not always use
- // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
- case "INTEGER":
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return ((Long) v).toString();
- case "FLOAT":
- verify(v instanceof Double, "Expected Double, got %s", v.getClass());
- return v;
- case "BOOLEAN":
- verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
- return v;
- case "TIMESTAMP":
- // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
- // Strings with variable-precision (up to six digits) to match the JSON files export
- // by BigQuery.
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- Double doubleValue = ((Long) v) / 1000000.0;
- return formatTimestamp(doubleValue.toString());
- case "RECORD":
- verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
- return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unexpected BigQuery field schema type %s for field named %s",
- fieldSchema.getType(),
- fieldSchema.getName()));
- }
- }
-
- @Nullable
- private static Object convertNullableField(
- Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
- // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
- verify(
- avroSchema.getType() == Type.UNION,
- "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
- avroSchema.getType(),
- fieldSchema.getName());
- List<Schema> unionTypes = avroSchema.getTypes();
- verify(
- unionTypes.size() == 2,
- "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
- fieldSchema.getName(),
- unionTypes);
-
- if (v == null) {
- return null;
- }
-
- Type firstType = unionTypes.get(0).getType();
- if (!firstType.equals(Type.NULL)) {
- return convertRequiredField(firstType, fieldSchema, v);
- }
- return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
deleted file mode 100644
index 514e005..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.BigQueryOptions;
-
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An interface for real, mock, or fake implementations of Cloud BigQuery services.
- */
-public interface BigQueryServices extends Serializable {
-
- /**
- * Returns a real, mock, or fake {@link JobService}.
- */
- public JobService getJobService(BigQueryOptions bqOptions);
-
- /**
- * Returns a real, mock, or fake {@link DatasetService}.
- */
- public DatasetService getDatasetService(BigQueryOptions bqOptions);
-
- /**
- * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
- */
- public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
-
- /**
- * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
- */
- public BigQueryJsonReader getReaderFromQuery(
- BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
-
- /**
- * An interface for the Cloud BigQuery load service.
- */
- public interface JobService {
- /**
- * Start a BigQuery load job.
- */
- void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
- throws InterruptedException, IOException;
- /**
- * Start a BigQuery extract job.
- */
- void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
- throws InterruptedException, IOException;
-
- /**
- * Start a BigQuery query job.
- */
- void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
- throws IOException, InterruptedException;
-
- /**
- * Waits for the job is Done, and returns the job.
- *
- * <p>Returns null if the {@code maxAttempts} retries reached.
- */
- Job pollJob(JobReference jobRef, int maxAttempts)
- throws InterruptedException, IOException;
-
- /**
- * Dry runs the query in the given project.
- */
- JobStatistics dryRunQuery(String projectId, String query)
- throws InterruptedException, IOException;
- }
-
- /**
- * An interface to get, create and delete Cloud BigQuery datasets and tables.
- */
- public interface DatasetService {
- /**
- * Gets the specified {@link Table} resource by table ID.
- */
- Table getTable(String projectId, String datasetId, String tableId)
- throws InterruptedException, IOException;
-
- /**
- * Deletes the table specified by tableId from the dataset.
- * If the table contains data, all the data will be deleted.
- */
- void deleteTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException;
-
- /**
- * Returns true if the table is empty.
- */
- boolean isTableEmpty(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException;
-
- /**
- * Gets the specified {@link Dataset} resource by dataset ID.
- */
- Dataset getDataset(String projectId, String datasetId)
- throws IOException, InterruptedException;
-
- /**
- * Create a {@link Dataset} with the given {@code location} and {@code description}.
- */
- void createDataset(String projectId, String datasetId, String location, String description)
- throws IOException, InterruptedException;
-
- /**
- * Deletes the dataset specified by the datasetId value.
- *
- * <p>Before you can delete a dataset, you must delete all its tables.
- */
- void deleteDataset(String projectId, String datasetId)
- throws IOException, InterruptedException;
- }
-
- /**
- * An interface to read the Cloud BigQuery directly.
- */
- public interface BigQueryJsonReader {
- /**
- * Initializes the reader and advances the reader to the first record.
- */
- boolean start() throws IOException;
-
- /**
- * Advances the reader to the next valid record.
- */
- boolean advance() throws IOException;
-
- /**
- * Returns the value of the data item that was read by the last {@link #start} or
- * {@link #advance} call. The returned value must be effectively immutable and remain valid
- * indefinitely.
- *
- * <p>Multiple calls to this method without an intervening call to {@link #advance} should
- * return the same result.
- *
- * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
- * the last {@link #start} or {@link #advance} returned {@code false}.
- */
- TableRow getCurrent() throws NoSuchElementException;
-
- /**
- * Closes the reader. The reader cannot be used after this method is called.
- */
- void close() throws IOException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
deleted file mode 100644
index 1aadeb2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.BigQueryOptions;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
- * service.
- */
-public class BigQueryServicesImpl implements BigQueryServices {
-
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
-
- // The maximum number of attempts to execute a BigQuery RPC.
- private static final int MAX_RPC_ATTEMPTS = 10;
-
- // The initial backoff for executing a BigQuery RPC.
- private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
- // The initial backoff for polling the status of a BigQuery job.
- private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
- @Override
- public JobService getJobService(BigQueryOptions options) {
- return new JobServiceImpl(options);
- }
-
- @Override
- public DatasetService getDatasetService(BigQueryOptions options) {
- return new DatasetServiceImpl(options);
- }
-
- @Override
- public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
- return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
- }
-
- @Override
- public BigQueryJsonReader getReaderFromQuery(
- BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
- return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
- }
-
- @VisibleForTesting
- static class JobServiceImpl implements BigQueryServices.JobService {
- private final ApiErrorExtractor errorExtractor;
- private final Bigquery client;
-
- @VisibleForTesting
- JobServiceImpl(Bigquery client) {
- this.errorExtractor = new ApiErrorExtractor();
- this.client = client;
- }
-
- private JobServiceImpl(BigQueryOptions options) {
- this.errorExtractor = new ApiErrorExtractor();
- this.client = Transport.newBigQueryClient(options).build();
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void startLoadJob(
- JobReference jobRef,
- JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
- Job job = new Job()
- .setJobReference(jobRef)
- .setConfiguration(new JobConfiguration().setLoad(loadConfig));
-
- startJob(job, errorExtractor, client);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
- throws InterruptedException, IOException {
- Job job = new Job()
- .setJobReference(jobRef)
- .setConfiguration(
- new JobConfiguration().setExtract(extractConfig));
-
- startJob(job, errorExtractor, client);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
- throws IOException, InterruptedException {
- Job job = new Job()
- .setJobReference(jobRef)
- .setConfiguration(
- new JobConfiguration().setQuery(queryConfig));
-
- startJob(job, errorExtractor, client);
- }
-
- private static void startJob(Job job,
- ApiErrorExtractor errorExtractor,
- Bigquery client) throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
- }
-
- @VisibleForTesting
- static void startJob(
- Job job,
- ApiErrorExtractor errorExtractor,
- Bigquery client,
- Sleeper sleeper,
- BackOff backoff) throws IOException, InterruptedException {
- JobReference jobRef = job.getJobReference();
- Exception lastException = null;
- do {
- try {
- client.jobs().insert(jobRef.getProjectId(), job).execute();
- return; // SUCCEEDED
- } catch (GoogleJsonResponseException e) {
- if (errorExtractor.itemAlreadyExists(e)) {
- return; // SUCCEEDED
- }
- // ignore and retry
- LOG.warn("Ignore the error and retry inserting the job.", e);
- lastException = e;
- } catch (IOException e) {
- // ignore and retry
- LOG.warn("Ignore the error and retry inserting the job.", e);
- lastException = e;
- }
- } while (nextBackOff(sleeper, backoff));
- throw new IOException(
- String.format(
- "Unable to insert job: %s, aborting after %d .",
- jobRef.getJobId(), MAX_RPC_ATTEMPTS),
- lastException);
- }
-
- @Override
- public Job pollJob(JobReference jobRef, int maxAttempts)
- throws InterruptedException {
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
- return pollJob(jobRef, Sleeper.DEFAULT, backoff);
- }
-
- @VisibleForTesting
- Job pollJob(
- JobReference jobRef,
- Sleeper sleeper,
- BackOff backoff) throws InterruptedException {
- do {
- try {
- Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
- JobStatus status = job.getStatus();
- if (status != null && status.getState() != null && status.getState().equals("DONE")) {
- return job;
- }
- // The job is not DONE, wait longer and retry.
- } catch (IOException e) {
- // ignore and retry
- LOG.warn("Ignore the error and retry polling job status.", e);
- }
- } while (nextBackOff(sleeper, backoff));
- LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
- return null;
- }
-
- @Override
- public JobStatistics dryRunQuery(String projectId, String query)
- throws InterruptedException, IOException {
- Job job = new Job()
- .setConfiguration(new JobConfiguration()
- .setQuery(new JobConfigurationQuery()
- .setQuery(query))
- .setDryRun(true));
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- return executeWithRetries(
- client.jobs().insert(projectId, job),
- String.format(
- "Unable to dry run query: %s, aborting after %d retries.",
- query, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff).getStatistics();
- }
- }
-
- @VisibleForTesting
- static class DatasetServiceImpl implements DatasetService {
- private final ApiErrorExtractor errorExtractor;
- private final Bigquery client;
-
- @VisibleForTesting
- DatasetServiceImpl(Bigquery client) {
- this.errorExtractor = new ApiErrorExtractor();
- this.client = client;
- }
-
- private DatasetServiceImpl(BigQueryOptions bqOptions) {
- this.errorExtractor = new ApiErrorExtractor();
- this.client = Transport.newBigQueryClient(bqOptions).build();
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public Table getTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- return executeWithRetries(
- client.tables().get(projectId, datasetId, tableId),
- String.format(
- "Unable to get table: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void deleteTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- executeWithRetries(
- client.tables().delete(projectId, datasetId, tableId),
- String.format(
- "Unable to delete table: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff);
- }
-
- @Override
- public boolean isTableEmpty(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- TableDataList dataList = executeWithRetries(
- client.tabledata().list(projectId, datasetId, tableId),
- String.format(
- "Unable to list table data: %s, aborting after %d retries.",
- tableId, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff);
- return dataList.getRows() == null || dataList.getRows().isEmpty();
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public Dataset getDataset(String projectId, String datasetId)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- return executeWithRetries(
- client.datasets().get(projectId, datasetId),
- String.format(
- "Unable to get dataset: %s, aborting after %d retries.",
- datasetId, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void createDataset(
- String projectId, String datasetId, String location, String description)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
- }
-
- @VisibleForTesting
- void createDataset(
- String projectId,
- String datasetId,
- String location,
- String description,
- Sleeper sleeper,
- BackOff backoff) throws IOException, InterruptedException {
- DatasetReference datasetRef = new DatasetReference()
- .setProjectId(projectId)
- .setDatasetId(datasetId);
-
- Dataset dataset = new Dataset()
- .setDatasetReference(datasetRef)
- .setLocation(location)
- .setFriendlyName(location)
- .setDescription(description);
-
- Exception lastException;
- do {
- try {
- client.datasets().insert(projectId, dataset).execute();
- return; // SUCCEEDED
- } catch (GoogleJsonResponseException e) {
- if (errorExtractor.itemAlreadyExists(e)) {
- return; // SUCCEEDED
- }
- // ignore and retry
- LOG.warn("Ignore the error and retry creating the dataset.", e);
- lastException = e;
- } catch (IOException e) {
- LOG.warn("Ignore the error and retry creating the dataset.", e);
- lastException = e;
- }
- } while (nextBackOff(sleeper, backoff));
- throw new IOException(
- String.format(
- "Unable to create dataset: %s, aborting after %d .",
- datasetId, MAX_RPC_ATTEMPTS),
- lastException);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
- *
- * @throws IOException if it exceeds max RPC .
- */
- @Override
- public void deleteDataset(String projectId, String datasetId)
- throws IOException, InterruptedException {
- BackOff backoff =
- new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
- executeWithRetries(
- client.datasets().delete(projectId, datasetId),
- String.format(
- "Unable to delete table: %s, aborting after %d retries.",
- datasetId, MAX_RPC_ATTEMPTS),
- Sleeper.DEFAULT,
- backoff);
- }
- }
-
- private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
- BigQueryTableRowIterator iterator;
-
- private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
- this.iterator = iterator;
- }
-
- private static BigQueryJsonReader fromQuery(
- BigQueryOptions bqOptions,
- String query,
- String projectId,
- @Nullable Boolean flattenResults) {
- return new BigQueryJsonReaderImpl(
- BigQueryTableRowIterator.fromQuery(
- query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
- }
-
- private static BigQueryJsonReader fromTable(
- BigQueryOptions bqOptions,
- TableReference tableRef) {
- return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
- tableRef, Transport.newBigQueryClient(bqOptions).build()));
- }
-
- @Override
- public boolean start() throws IOException {
- try {
- iterator.open();
- return iterator.advance();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted during start() operation", e);
- }
- }
-
- @Override
- public boolean advance() throws IOException {
- try {
- return iterator.advance();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted during advance() operation", e);
- }
- }
-
- @Override
- public TableRow getCurrent() throws NoSuchElementException {
- return iterator.getCurrent();
- }
-
- @Override
- public void close() throws IOException {
- iterator.close();
- }
- }
-
- @VisibleForTesting
- static <T> T executeWithRetries(
- AbstractGoogleClientRequest<T> request,
- String errorMessage,
- Sleeper sleeper,
- BackOff backoff)
- throws IOException, InterruptedException {
- Exception lastException = null;
- do {
- try {
- return request.execute();
- } catch (IOException e) {
- LOG.warn("Ignore the error and retry the request.", e);
- lastException = e;
- }
- } while (nextBackOff(sleeper, backoff));
- throw new IOException(
- errorMessage,
- lastException);
- }
-
- /**
- * Identical to {@link BackOffUtils#next} but without checked IOException.
- * @throws InterruptedException
- */
- private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
- try {
- return BackOffUtils.next(sleeper, backoff);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
deleted file mode 100644
index 84004a7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Inserts rows into BigQuery.
- */
-public class BigQueryTableInserter {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
- // Approximate amount of table data to upload per InsertAll request.
- private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
- // The maximum number of rows to upload per InsertAll request.
- private static final long MAX_ROWS_PER_BATCH = 500;
-
- // The maximum number of times to retry inserting rows into BigQuery.
- private static final int MAX_INSERT_ATTEMPTS = 5;
-
- // The initial backoff after a failure inserting rows into BigQuery.
- private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
- // Backoff time bounds for rate limit exceeded errors.
- private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
- private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
-
- private final Bigquery client;
- private final TableReference defaultRef;
- private final long maxRowsPerBatch;
-
- private ExecutorService executor;
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param options a PipelineOptions object
- */
- public BigQueryTableInserter(Bigquery client, PipelineOptions options) {
- this.client = client;
- this.defaultRef = null;
- this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param options a PipelineOptions object
- * @param defaultRef identifies the table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)}
- */
- @Deprecated
- public BigQueryTableInserter(Bigquery client, PipelineOptions options,
- TableReference defaultRef) {
- this.client = client;
- this.defaultRef = defaultRef;
- this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param options a PipelineOptions object
- * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
- */
- public BigQueryTableInserter(Bigquery client, PipelineOptions options,
- int maxRowsPerBatch) {
- this.client = client;
- this.defaultRef = null;
- this.maxRowsPerBatch = maxRowsPerBatch;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param defaultRef identifies the default table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)}
- */
- @Deprecated
- public BigQueryTableInserter(Bigquery client, PipelineOptions options,
- TableReference defaultRef, int maxRowsPerBatch) {
- this.client = client;
- this.defaultRef = defaultRef;
- this.maxRowsPerBatch = maxRowsPerBatch;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Insert all rows from the given list.
- *
- * @deprecated replaced by {@link #insertAll(TableReference, List)}
- */
- @Deprecated
- public void insertAll(List<TableRow> rowList) throws IOException {
- insertAll(defaultRef, rowList, null, null);
- }
-
- /**
- * Insert all rows from the given list using specified insertIds if not null.
- *
- * @deprecated replaced by {@link #insertAll(TableReference, List, List)}
- */
- @Deprecated
- public void insertAll(List<TableRow> rowList,
- @Nullable List<String> insertIdList) throws IOException {
- insertAll(defaultRef, rowList, insertIdList, null);
- }
-
- /**
- * Insert all rows from the given list.
- */
- public void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
- insertAll(ref, rowList, null, null);
- }
-
- /**
- * Insert all rows from the given list using specified insertIds if not null. Track count of
- * bytes written with the Aggregator.
- */
- public void insertAll(TableReference ref, List<TableRow> rowList,
- @Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator)
- throws IOException {
- checkNotNull(ref, "ref");
- if (insertIdList != null && rowList.size() != insertIdList.size()) {
- throw new AssertionError("If insertIdList is not null it needs to have at least "
- + "as many elements as rowList");
- }
-
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_INSERT_ATTEMPTS,
- INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
- List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
- // These lists contain the rows to publish. Initially the contain the entire list. If there are
- // failures, they will contain only the failed rows to be retried.
- List<TableRow> rowsToPublish = rowList;
- List<String> idsToPublish = insertIdList;
- while (true) {
- List<TableRow> retryRows = new ArrayList<>();
- List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
-
- int strideIndex = 0;
- // Upload in batches.
- List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
- int dataSize = 0;
-
- List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
- List<Integer> strideIndices = new ArrayList<>();
-
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i);
- TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
- dataSize += row.toString().length();
- if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
- || i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new TableDataInsertAllRequest();
- content.setRows(rows);
-
- final Bigquery.Tabledata.InsertAll insert = client.tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
- content);
-
- futures.add(
- executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
- @Override
- public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
- BackOff backoff = new IntervalBoundedExponentialBackOff(
- MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
- while (true) {
- try {
- return insert.execute().getInsertErrors();
- } catch (IOException e) {
- if (new ApiErrorExtractor().rateLimited(e)) {
- LOG.info("BigQuery insertAll exceeded rate limit, retrying");
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying insertAll");
- }
- } else {
- throw e;
- }
- }
- }
- }
- }));
- strideIndices.add(strideIndex);
-
- if (byteCountAggregator != null) {
- byteCountAggregator.addValue(Long.valueOf(dataSize));
- }
- dataSize = 0;
- strideIndex = i + 1;
- rows = new LinkedList<>();
- }
- }
-
- try {
- for (int i = 0; i < futures.size(); i++) {
- List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
- if (errors != null) {
- for (TableDataInsertAllResponse.InsertErrors error : errors) {
- allErrors.add(error);
- if (error.getIndex() == null) {
- throw new IOException("Insert failed: " + allErrors);
- }
-
- int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
- retryRows.add(rowsToPublish.get(errorIndex));
- if (retryIds != null) {
- retryIds.add(idsToPublish.get(errorIndex));
- }
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while inserting " + rowsToPublish);
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- }
-
- if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
- }
- LOG.info("Retrying failed inserts to BigQuery");
- rowsToPublish = retryRows;
- idsToPublish = retryIds;
- allErrors.clear();
- } else {
- break;
- }
- }
- if (!allErrors.isEmpty()) {
- throw new IOException("Insert failed: " + allErrors);
- }
- }
-
- /**
- * Retrieves or creates the table.
- *
- * <p>The table is checked to conform to insertion requirements as specified
- * by WriteDisposition and CreateDisposition.
- *
- * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
- * this will re-create the table if necessary to ensure it is empty.
- *
- * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
- * will fail if the table exists and is not empty.
- *
- * <p>When constructing a table, a {@code TableSchema} must be available. If a
- * schema is provided, then it will be used. If no schema is provided, but
- * an existing table is being cleared (WRITE_TRUNCATE option above), then
- * the existing schema will be re-used. If no schema is available, then an
- * {@code IOException} is thrown.
- */
- public Table getOrCreateTable(
- TableReference ref,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable TableSchema schema) throws IOException {
- // Check if table already exists.
- Bigquery.Tables.Get get = client.tables()
- .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- Table table = null;
- try {
- table = get.execute();
- } catch (IOException e) {
- ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
- if (!errorExtractor.itemNotFound(e)
- || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
- // Rethrow.
- throw e;
- }
- }
-
- // If we want an empty table, and it isn't, then delete it first.
- if (table != null) {
- if (writeDisposition == WriteDisposition.WRITE_APPEND) {
- return table;
- }
-
- boolean empty = isEmpty(ref);
- if (empty) {
- if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
- LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
- }
- return table;
-
- } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
- throw new IOException("WriteDisposition is WRITE_EMPTY, "
- + "but table is not empty");
- }
-
- // Reuse the existing schema if none was provided.
- if (schema == null) {
- schema = table.getSchema();
- }
-
- // Delete table and fall through to re-creating it below.
- LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
- Bigquery.Tables.Delete delete = client.tables()
- .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- delete.execute();
- }
-
- if (schema == null) {
- throw new IllegalArgumentException(
- "Table schema required for new table.");
- }
-
- // Create the table.
- return tryCreateTable(ref, schema);
- }
-
- /**
- * Checks if a table is empty.
- */
- public boolean isEmpty(TableReference ref) throws IOException {
- Bigquery.Tabledata.List list = client.tabledata()
- .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- list.setMaxResults(1L);
- TableDataList dataList = list.execute();
-
- return dataList.getRows() == null || dataList.getRows().isEmpty();
- }
-
- /**
- * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
- * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
- * configured with a table spec function to use different tables for each window.
- */
- private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
- /**
- * Tries to create the BigQuery table.
- * If a table with the same name already exists in the dataset, the table
- * creation fails, and the function returns null. In such a case,
- * the existing table doesn't necessarily have the same schema as specified
- * by the parameter.
- *
- * @param schema Schema of the new BigQuery table.
- * @return The newly created BigQuery table information, or null if the table
- * with the same name already exists.
- * @throws IOException if other error than already existing table occurs.
- */
- @Nullable
- public Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
- LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
- BackOff backoff =
- new ExponentialBackOff.Builder()
- .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
- .build();
-
- Table table = new Table().setTableReference(ref).setSchema(schema);
- return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
- }
-
- @VisibleForTesting
- @Nullable
- Table tryCreateTable(
- Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
- throws IOException {
- boolean retry = false;
- while (true) {
- try {
- return client.tables().insert(projectId, datasetId, table).execute();
- } catch (IOException e) {
- ApiErrorExtractor extractor = new ApiErrorExtractor();
- if (extractor.itemAlreadyExists(e)) {
- // The table already exists, nothing to return.
- return null;
- } else if (extractor.rateLimited(e)) {
- // The request failed because we hit a temporary quota. Back off and try again.
- try {
- if (BackOffUtils.next(sleeper, backoff)) {
- if (!retry) {
- LOG.info(
- "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
- projectId,
- datasetId,
- table.getTableReference().getTableId(),
- TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
- retry = true;
- }
- continue;
- }
- } catch (InterruptedException e1) {
- // Restore interrupted state and throw the last failure.
- Thread.currentThread().interrupt();
- throw e;
- }
- }
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
deleted file mode 100644
index 3865654..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ClassInfo;
-import com.google.api.client.util.Data;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Iterates over all rows in a table.
- */
-public class BigQueryTableRowIterator implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
-
- @Nullable private TableReference ref;
- @Nullable private final String projectId;
- @Nullable private TableSchema schema;
- private final Bigquery client;
- private String pageToken;
- private Iterator<TableRow> iteratorOverCurrentBatch;
- private TableRow current;
- // Set true when the final page is seen from the service.
- private boolean lastPage = false;
-
- // The maximum number of times a BigQuery request will be retried
- private static final int MAX_RETRIES = 3;
- // Initial wait time for the backoff implementation
- private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
-
- // After sending a query to BQ service we will be polling the BQ service to check the status with
- // following interval to check the status of query execution job
- private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
-
- private final String query;
- // Whether to flatten query results.
- private final boolean flattenResults;
- // Temporary dataset used to store query results.
- private String temporaryDatasetId = null;
- // Temporary table used to store query results.
- private String temporaryTableId = null;
-
- private BigQueryTableRowIterator(
- @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
- Bigquery client, boolean flattenResults) {
- this.ref = ref;
- this.query = query;
- this.projectId = projectId;
- this.client = checkNotNull(client, "client");
- this.flattenResults = flattenResults;
- }
-
- /**
- * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
- */
- public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
- checkNotNull(ref, "ref");
- checkNotNull(client, "client");
- return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
- }
-
- /**
- * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
- * specified query in the specified project.
- */
- public static BigQueryTableRowIterator fromQuery(
- String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
- checkNotNull(query, "query");
- checkNotNull(projectId, "projectId");
- checkNotNull(client, "client");
- return new BigQueryTableRowIterator(null, query, projectId, client,
- MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
- }
-
- /**
- * Opens the table for read.
- * @throws IOException on failure
- */
- public void open() throws IOException, InterruptedException {
- if (query != null) {
- ref = executeQueryAndWaitForCompletion();
- }
- // Get table schema.
- Bigquery.Tables.Get get =
- client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-
- Table table =
- executeWithBackOff(
- get,
- "Error opening BigQuery table %s of dataset %s : {}",
- ref.getTableId(),
- ref.getDatasetId());
- schema = table.getSchema();
- }
-
- public boolean advance() throws IOException, InterruptedException {
- while (true) {
- if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
- // Embed schema information into the raw row, so that values have an
- // associated key. This matches how rows are read when using the
- // DataflowRunner.
- current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
- return true;
- }
- if (lastPage) {
- return false;
- }
-
- Bigquery.Tabledata.List list =
- client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- if (pageToken != null) {
- list.setPageToken(pageToken);
- }
-
- TableDataList result =
- executeWithBackOff(
- list,
- "Error reading from BigQuery table %s of dataset %s : {}",
- ref.getTableId(),
- ref.getDatasetId());
-
- pageToken = result.getPageToken();
- iteratorOverCurrentBatch =
- result.getRows() != null
- ? result.getRows().iterator()
- : Collections.<TableRow>emptyIterator();
-
- // The server may return a page token indefinitely on a zero-length table.
- if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
- lastPage = true;
- }
- }
- }
-
- public TableRow getCurrent() {
- if (current == null) {
- throw new NoSuchElementException();
- }
- return current;
- }
-
- /**
- * Adjusts a field returned from the BigQuery API to match what we will receive when running
- * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
- * used for batch jobs executed on the Cloud Dataflow service.
- *
- * <p>The following is the relationship between BigQuery schema and Java types:
- *
- * <ul>
- * <li>Nulls are {@code null}.
- * <li>Repeated fields are {@code List} of objects.
- * <li>Record columns are {@link TableRow} objects.
- * <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
- * <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
- * <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
- * {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
- * zeros and can be 1 to 6 digits long.
- * <li>Every other atomic type is a {@code String}.
- * </ul>
- *
- * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
- *
- * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
- * and are not accessible through the {@link TableRow#getF} function.
- */
- @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
- if (Data.isNull(v)) {
- return null;
- }
-
- if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
- TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
- ImmutableList.Builder<Object> values = ImmutableList.builder();
- for (Map<String, Object> element : rawCells) {
- values.add(getTypedCellValue(elementSchema, element.get("v")));
- }
- return values.build();
- }
-
- if (fieldSchema.getType().equals("RECORD")) {
- @SuppressWarnings("unchecked")
- Map<String, Object> typedV = (Map<String, Object>) v;
- return getTypedTableRow(fieldSchema.getFields(), typedV);
- }
-
- if (fieldSchema.getType().equals("FLOAT")) {
- return Double.parseDouble((String) v);
- }
-
- if (fieldSchema.getType().equals("BOOLEAN")) {
- return Boolean.parseBoolean((String) v);
- }
-
- if (fieldSchema.getType().equals("TIMESTAMP")) {
- return AvroUtils.formatTimestamp((String) v);
- }
-
- return v;
- }
-
- /**
- * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
- * because they are reserved keywords in {@link TableRow}.
- */
- // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
- // not indirect through our broken use of {@link TableRow}.
- // See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
- private static final Collection<String> RESERVED_FIELD_NAMES =
- ClassInfo.of(TableRow.class).getNames();
-
- /**
- * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
- * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
- * the cells are converted to Java types according to the provided field schemas.
- *
- * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
- * types are mapped to Java types.
- */
- private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
- // If rawRow is a TableRow, use it. If not, create a new one.
- TableRow row;
- List<? extends Map<String, Object>> cells;
- if (rawRow instanceof TableRow) {
- // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
- // any type conversion, but extract the cells for cell-wise processing below.
- row = (TableRow) rawRow;
- cells = row.getF();
- // Clear the cells from the row, so that row.getF() will return null. This matches the
- // behavior of rows produced by the BigQuery export API used on the service.
- row.setF(null);
- } else {
- row = new TableRow();
-
- // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
- // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
- // we will use Map.get("v") instead of TableCell.getV() get its value.
- @SuppressWarnings("unchecked")
- List<? extends Map<String, Object>> rawCells =
- (List<? extends Map<String, Object>>) rawRow.get("f");
- cells = rawCells;
- }
-
- checkState(cells.size() == fields.size(),
- "Expected that the row has the same number of cells %s as fields in the schema %s",
- cells.size(), fields.size());
-
- // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
- // and storing the normalized values by field name in the Map<String, Object> that
- // underlies the TableRow.
- Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
- Iterator<TableFieldSchema> fieldIt = fields.iterator();
- while (cellIt.hasNext()) {
- Map<String, Object> cell = cellIt.next();
- TableFieldSchema fieldSchema = fieldIt.next();
-
- // Convert the object in this cell to the Java type corresponding to its type in the schema.
- Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
-
- String fieldName = fieldSchema.getName();
- checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
- "BigQueryIO does not support records with columns named %s", fieldName);
-
- if (convertedValue == null) {
- // BigQuery does not include null values when the export operation (to JSON) is used.
- // To match that behavior, BigQueryTableRowiterator, and the DirectRunner,
- // intentionally omits columns with null values.
- continue;
- }
-
- row.set(fieldName, convertedValue);
- }
- return row;
- }
-
- // Create a new BigQuery dataset
- private void createDataset(String datasetId) throws IOException, InterruptedException {
- Dataset dataset = new Dataset();
- DatasetReference reference = new DatasetReference();
- reference.setProjectId(projectId);
- reference.setDatasetId(datasetId);
- dataset.setDatasetReference(reference);
-
- String createDatasetError =
- "Error when trying to create the temporary dataset " + datasetId + " in project "
- + projectId;
- executeWithBackOff(
- client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
- }
-
- // Delete the given table that is available in the given dataset.
- private void deleteTable(String datasetId, String tableId)
- throws IOException, InterruptedException {
- executeWithBackOff(
- client.tables().delete(projectId, datasetId, tableId),
- "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
- + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
- }
-
- // Delete the given dataset. This will fail if the given dataset has any tables.
- private void deleteDataset(String datasetId) throws IOException, InterruptedException {
- executeWithBackOff(
- client.datasets().delete(projectId, datasetId),
- "Error when trying to delete the temporary dataset " + datasetId + " in project "
- + projectId + ". Manual deletion may be required. Error message : {}");
- }
-
- /**
- * Executes the specified query and returns a reference to the temporary BigQuery table created
- * to hold the results.
- *
- * @throws IOException if the query fails.
- */
- private TableReference executeQueryAndWaitForCompletion()
- throws IOException, InterruptedException {
- // Create a temporary dataset to store results.
- // Starting dataset name with an "_" so that it is hidden.
- Random rnd = new Random(System.currentTimeMillis());
- temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
- temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
-
- createDataset(temporaryDatasetId);
- Job job = new Job();
- JobConfiguration config = new JobConfiguration();
- JobConfigurationQuery queryConfig = new JobConfigurationQuery();
- config.setQuery(queryConfig);
- job.setConfiguration(config);
- queryConfig.setQuery(query);
- queryConfig.setAllowLargeResults(true);
- queryConfig.setFlattenResults(flattenResults);
-
- TableReference destinationTable = new TableReference();
- destinationTable.setProjectId(projectId);
- destinationTable.setDatasetId(temporaryDatasetId);
- destinationTable.setTableId(temporaryTableId);
- queryConfig.setDestinationTable(destinationTable);
-
- Insert insert = client.jobs().insert(projectId, job);
- Job queryJob = executeWithBackOff(
- insert, "Error when trying to execute the job for query " + query + " :{}");
- JobReference jobId = queryJob.getJobReference();
-
- while (true) {
- Job pollJob = executeWithBackOff(
- client.jobs().get(projectId, jobId.getJobId()),
- "Error when trying to get status of the job for query " + query + " :{}");
- JobStatus status = pollJob.getStatus();
- if (status.getState().equals("DONE")) {
- // Job is DONE, but did not necessarily succeed.
- ErrorProto error = status.getErrorResult();
- if (error == null) {
- return pollJob.getConfiguration().getQuery().getDestinationTable();
- } else {
- // There will be no temporary table to delete, so null out the reference.
- temporaryTableId = null;
- throw new IOException("Executing query " + query + " failed: " + error.getMessage());
- }
- }
- Uninterruptibles.sleepUninterruptibly(
- QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
- }
- }
-
- // Execute a BQ request with exponential backoff and return the result.
- // client - BQ request to be executed
- // error - Formatted message to log if when a request fails. Takes exception message as a
- // formatter parameter.
- public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
- Object... errorArgs) throws IOException, InterruptedException {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff =
- new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
-
- T result = null;
- while (true) {
- try {
- result = client.execute();
- break;
- } catch (IOException e) {
- LOG.error(String.format(error, errorArgs), e.getMessage());
- if (!BackOffUtils.next(sleeper, backOff)) {
- LOG.error(
- String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
- throw e;
- }
- }
- }
-
- return result;
- }
-
- @Override
- public void close() {
- // Prevent any further requests.
- lastPage = true;
-
- try {
- // Deleting temporary table and dataset that gets generated when executing a query.
- if (temporaryDatasetId != null) {
- if (temporaryTableId != null) {
- deleteTable(temporaryDatasetId, temporaryTableId);
- }
- deleteDataset(temporaryDatasetId);
- }
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index a616f5a..ca3f0ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -19,13 +19,12 @@ package org.apache.beam.sdk.values;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -40,19 +39,15 @@ import org.apache.beam.sdk.util.WindowingStrategy;
* be passed as the inputs of other PTransforms.
*
* <p>Some root transforms produce bounded {@code PCollections} and others
- * produce unbounded ones. For example, {@link TextIO.Read} reads a static set
- * of files, so it produces a bounded {@link PCollection}.
- * {@link PubsubIO.Read}, on the other hand, receives a potentially infinite stream
- * of Pubsub messages, so it produces an unbounded {@link PCollection}.
+ * produce unbounded ones. For example, {@link CountingInput#upTo} produces a fixed set of integers,
+ * so it produces a bounded {@link PCollection}. {@link CountingInput#unbounded} produces all
+ * integers as an infinite stream, so it produces an unbounded {@link PCollection}.
*
- * <p>Each element in a {@link PCollection} may have an associated implicit
- * timestamp. Readers assign timestamps to elements when they create
- * {@link PCollection PCollections}, and other {@link PTransform PTransforms} propagate these
- * timestamps from their input to their output. For example, {@link PubsubIO.Read}
- * assigns pubsub message timestamps to elements, and {@link TextIO.Read} assigns
- * the default value {@link BoundedWindow#TIMESTAMP_MIN_VALUE} to elements. User code can
- * explicitly assign timestamps to elements with
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#outputWithTimestamp}.
+ * <p>Each element in a {@link PCollection} has an associated timestamp. Readers assign timestamps
+ * to elements when they create {@link PCollection PCollections}, and other
+ * {@link PTransform PTransforms} propagate these timestamps from their input to their output. See
+ * the documentation on {@link BoundedReader} and {@link UnboundedReader} for more information on
+ * how these readers produce timestamps and watermarks.
*
* <p>Additionally, a {@link PCollection} has an associated
* {@link WindowFn} and each element is assigned to a set of windows.
@@ -73,14 +68,11 @@ public class PCollection<T> extends TypedPValue<T> {
*/
public enum IsBounded {
/**
- * Indicates that a {@link PCollection} contains bounded data elements, such as
- * {@link PCollection PCollections} from {@link TextIO}, {@link BigQueryIO},
- * {@link Create} e.t.c.
+ * Indicates that a {@link PCollection} contains a bounded number of elements.
*/
BOUNDED,
/**
- * Indicates that a {@link PCollection} contains unbounded data elements, such as
- * {@link PCollection PCollections} from {@link PubsubIO}.
+ * Indicates that a {@link PCollection} contains an unbounded number of elements.
*/
UNBOUNDED;