You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:02 UTC

[07/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
index df2308d..16fc6fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
@@ -17,34 +17,15 @@
  */
 package org.apache.beam.sdk.util;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
 
 /**
  * A set of utilities for working with Avro files.
@@ -154,192 +135,4 @@ public class AvroUtils {
     }
     return new AvroMetadata(syncMarker, codec, schemaString);
   }
-
-  /**
-   * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
-   * immutable.
-   */
-  private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
-  // Package private for BigQueryTableRowIterator to use.
-  static String formatTimestamp(String timestamp) {
-    // timestamp is in "seconds since epoch" format, with scientific notation.
-    // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
-    // Separate into seconds and microseconds.
-    double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
-    long timestampMicros = (long) timestampDoubleMicros;
-    long seconds = timestampMicros / 1000000;
-    int micros = (int) (timestampMicros % 1000000);
-    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
-
-    // No sub-second component.
-    if (micros == 0) {
-      return String.format("%s UTC", dayAndTime);
-    }
-
-    // Sub-second component.
-    int digits = 6;
-    int subsecond = micros;
-    while (subsecond % 10 == 0) {
-      digits--;
-      subsecond /= 10;
-    }
-    String formatString = String.format("%%0%dd", digits);
-    String fractionalSeconds = String.format(formatString, subsecond);
-    return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
-  }
-
-  /**
-   * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
-   *
-   * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
-   * "Avro format"</a> for more information.
-   */
-  public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
-    return convertGenericRecordToTableRow(record, schema.getFields());
-  }
-
-  private static TableRow convertGenericRecordToTableRow(
-      GenericRecord record, List<TableFieldSchema> fields) {
-    TableRow row = new TableRow();
-    for (TableFieldSchema subSchema : fields) {
-      // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
-      // is required, so it may not be null.
-      Field field = record.getSchema().getField(subSchema.getName());
-      Object convertedValue =
-          getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
-      if (convertedValue != null) {
-        // To match the JSON files exported by BigQuery, do not include null values in the output.
-        row.set(field.name(), convertedValue);
-      }
-    }
-    return row;
-  }
-
-  @Nullable
-  private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
-    // is optional (and so it may be null), but defaults to "NULLABLE".
-    String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
-    switch (mode) {
-      case "REQUIRED":
-        return convertRequiredField(schema.getType(), fieldSchema, v);
-      case "REPEATED":
-        return convertRepeatedField(schema, fieldSchema, v);
-      case "NULLABLE":
-        return convertNullableField(schema, fieldSchema, v);
-      default:
-        throw new UnsupportedOperationException(
-            "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
-    }
-  }
-
-  private static List<Object> convertRepeatedField(
-      Schema schema, TableFieldSchema fieldSchema, Object v) {
-    Type arrayType = schema.getType();
-    verify(
-        arrayType == Type.ARRAY,
-        "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
-        fieldSchema.getName(),
-        arrayType);
-    // REPEATED fields are represented as Avro arrays.
-    if (v == null) {
-      // Handle the case of an empty repeated field.
-      return ImmutableList.of();
-    }
-    @SuppressWarnings("unchecked")
-    List<Object> elements = (List<Object>) v;
-    ImmutableList.Builder<Object> values = ImmutableList.builder();
-    Type elementType = schema.getElementType().getType();
-    for (Object element : elements) {
-      values.add(convertRequiredField(elementType, fieldSchema, element));
-    }
-    return values.build();
-  }
-
-  private static Object convertRequiredField(
-      Type avroType, TableFieldSchema fieldSchema, Object v) {
-    // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
-    // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
-    ImmutableMap<String, Type> fieldMap =
-        ImmutableMap.<String, Type>builder()
-            .put("STRING", Type.STRING)
-            .put("INTEGER", Type.LONG)
-            .put("FLOAT", Type.DOUBLE)
-            .put("BOOLEAN", Type.BOOLEAN)
-            .put("TIMESTAMP", Type.LONG)
-            .put("RECORD", Type.RECORD)
-            .build();
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    Type expectedAvroType = fieldMap.get(bqType);
-    verify(
-        avroType == expectedAvroType,
-        "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
-        expectedAvroType,
-        avroType,
-        bqType,
-        fieldSchema.getName());
-    switch (fieldSchema.getType()) {
-      case "STRING":
-        // Avro will use a CharSequence to represent String objects, but it may not always use
-        // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
-        return v.toString();
-      case "INTEGER":
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        return ((Long) v).toString();
-      case "FLOAT":
-        verify(v instanceof Double, "Expected Double, got %s", v.getClass());
-        return v;
-      case "BOOLEAN":
-        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
-        return v;
-      case "TIMESTAMP":
-        // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
-        // Strings with variable-precision (up to six digits) to match the JSON files export
-        // by BigQuery.
-        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        Double doubleValue = ((Long) v) / 1000000.0;
-        return formatTimestamp(doubleValue.toString());
-      case "RECORD":
-        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
-        return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unexpected BigQuery field schema type %s for field named %s",
-                fieldSchema.getType(),
-                fieldSchema.getName()));
-    }
-  }
-
-  @Nullable
-  private static Object convertNullableField(
-      Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
-    // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
-    verify(
-        avroSchema.getType() == Type.UNION,
-        "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
-        avroSchema.getType(),
-        fieldSchema.getName());
-    List<Schema> unionTypes = avroSchema.getTypes();
-    verify(
-        unionTypes.size() == 2,
-        "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
-        fieldSchema.getName(),
-        unionTypes);
-
-    if (v == null) {
-      return null;
-    }
-
-    Type firstType = unionTypes.get(0).getType();
-    if (!firstType.equals(Type.NULL)) {
-      return convertRequiredField(firstType, fieldSchema, v);
-    }
-    return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
deleted file mode 100644
index 514e005..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.BigQueryOptions;
-
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An interface for real, mock, or fake implementations of Cloud BigQuery services.
- */
-public interface BigQueryServices extends Serializable {
-
-  /**
-   * Returns a real, mock, or fake {@link JobService}.
-   */
-  public JobService getJobService(BigQueryOptions bqOptions);
-
-  /**
-   * Returns a real, mock, or fake {@link DatasetService}.
-   */
-  public DatasetService getDatasetService(BigQueryOptions bqOptions);
-
-  /**
-   * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
-   */
-  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
-
-  /**
-   * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
-   */
-  public BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
-
-  /**
-   * An interface for the Cloud BigQuery load service.
-   */
-  public interface JobService {
-    /**
-     * Start a BigQuery load job.
-     */
-    void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
-        throws InterruptedException, IOException;
-    /**
-     * Start a BigQuery extract job.
-     */
-    void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
-        throws InterruptedException, IOException;
-
-    /**
-     * Start a BigQuery query job.
-     */
-    void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
-        throws IOException, InterruptedException;
-
-    /**
-     * Waits for the job is Done, and returns the job.
-     *
-     * <p>Returns null if the {@code maxAttempts} retries reached.
-     */
-    Job pollJob(JobReference jobRef, int maxAttempts)
-        throws InterruptedException, IOException;
-
-    /**
-     * Dry runs the query in the given project.
-     */
-    JobStatistics dryRunQuery(String projectId, String query)
-        throws InterruptedException, IOException;
-  }
-
-  /**
-   * An interface to get, create and delete Cloud BigQuery datasets and tables.
-   */
-  public interface DatasetService {
-    /**
-     * Gets the specified {@link Table} resource by table ID.
-     */
-    Table getTable(String projectId, String datasetId, String tableId)
-        throws InterruptedException, IOException;
-
-    /**
-     * Deletes the table specified by tableId from the dataset.
-     * If the table contains data, all the data will be deleted.
-     */
-    void deleteTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException;
-
-    /**
-     * Returns true if the table is empty.
-     */
-    boolean isTableEmpty(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException;
-
-    /**
-     * Gets the specified {@link Dataset} resource by dataset ID.
-     */
-    Dataset getDataset(String projectId, String datasetId)
-        throws IOException, InterruptedException;
-
-    /**
-     * Create a {@link Dataset} with the given {@code location} and {@code description}.
-     */
-    void createDataset(String projectId, String datasetId, String location, String description)
-        throws IOException, InterruptedException;
-
-    /**
-     * Deletes the dataset specified by the datasetId value.
-     *
-     * <p>Before you can delete a dataset, you must delete all its tables.
-     */
-    void deleteDataset(String projectId, String datasetId)
-        throws IOException, InterruptedException;
-  }
-
-  /**
-   * An interface to read the Cloud BigQuery directly.
-   */
-  public interface BigQueryJsonReader {
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     */
-    boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     */
-    boolean advance() throws IOException;
-
-    /**
-     * Returns the value of the data item that was read by the last {@link #start} or
-     * {@link #advance} call. The returned value must be effectively immutable and remain valid
-     * indefinitely.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
-     * return the same result.
-     *
-     * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
-     *         the last {@link #start} or {@link #advance} returned {@code false}.
-     */
-    TableRow getCurrent() throws NoSuchElementException;
-
-    /**
-     * Closes the reader. The reader cannot be used after this method is called.
-     */
-    void close() throws IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
deleted file mode 100644
index 1aadeb2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.BigQueryOptions;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
- * service.
- */
-public class BigQueryServicesImpl implements BigQueryServices {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
-
-  // The maximum number of attempts to execute a BigQuery RPC.
-  private static final int MAX_RPC_ATTEMPTS = 10;
-
-  // The initial backoff for executing a BigQuery RPC.
-  private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
-  // The initial backoff for polling the status of a BigQuery job.
-  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
-  @Override
-  public JobService getJobService(BigQueryOptions options) {
-    return new JobServiceImpl(options);
-  }
-
-  @Override
-  public DatasetService getDatasetService(BigQueryOptions options) {
-    return new DatasetServiceImpl(options);
-  }
-
-  @Override
-  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
-    return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
-  }
-
-  @Override
-  public BigQueryJsonReader getReaderFromQuery(
-      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
-    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
-  }
-
-  @VisibleForTesting
-  static class JobServiceImpl implements BigQueryServices.JobService {
-    private final ApiErrorExtractor errorExtractor;
-    private final Bigquery client;
-
-    @VisibleForTesting
-    JobServiceImpl(Bigquery client) {
-      this.errorExtractor = new ApiErrorExtractor();
-      this.client = client;
-    }
-
-    private JobServiceImpl(BigQueryOptions options) {
-      this.errorExtractor = new ApiErrorExtractor();
-      this.client = Transport.newBigQueryClient(options).build();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void startLoadJob(
-        JobReference jobRef,
-        JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
-      Job job = new Job()
-          .setJobReference(jobRef)
-          .setConfiguration(new JobConfiguration().setLoad(loadConfig));
-
-      startJob(job, errorExtractor, client);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
-        throws InterruptedException, IOException {
-      Job job = new Job()
-          .setJobReference(jobRef)
-          .setConfiguration(
-              new JobConfiguration().setExtract(extractConfig));
-
-      startJob(job, errorExtractor, client);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
-        throws IOException, InterruptedException {
-      Job job = new Job()
-          .setJobReference(jobRef)
-          .setConfiguration(
-              new JobConfiguration().setQuery(queryConfig));
-
-      startJob(job, errorExtractor, client);
-    }
-
-    private static void startJob(Job job,
-      ApiErrorExtractor errorExtractor,
-      Bigquery client) throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
-    }
-
-    @VisibleForTesting
-    static void startJob(
-        Job job,
-        ApiErrorExtractor errorExtractor,
-        Bigquery client,
-        Sleeper sleeper,
-        BackOff backoff) throws IOException, InterruptedException {
-      JobReference jobRef = job.getJobReference();
-      Exception lastException = null;
-      do {
-        try {
-          client.jobs().insert(jobRef.getProjectId(), job).execute();
-          return; // SUCCEEDED
-        } catch (GoogleJsonResponseException e) {
-          if (errorExtractor.itemAlreadyExists(e)) {
-            return; // SUCCEEDED
-          }
-          // ignore and retry
-          LOG.warn("Ignore the error and retry inserting the job.", e);
-          lastException = e;
-        } catch (IOException e) {
-          // ignore and retry
-          LOG.warn("Ignore the error and retry inserting the job.", e);
-          lastException = e;
-        }
-      } while (nextBackOff(sleeper, backoff));
-      throw new IOException(
-          String.format(
-              "Unable to insert job: %s, aborting after %d .",
-              jobRef.getJobId(), MAX_RPC_ATTEMPTS),
-          lastException);
-    }
-
-    @Override
-    public Job pollJob(JobReference jobRef, int maxAttempts)
-        throws InterruptedException {
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
-      return pollJob(jobRef, Sleeper.DEFAULT, backoff);
-    }
-
-    @VisibleForTesting
-    Job pollJob(
-        JobReference jobRef,
-        Sleeper sleeper,
-        BackOff backoff) throws InterruptedException {
-      do {
-        try {
-          Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
-          JobStatus status = job.getStatus();
-          if (status != null && status.getState() != null && status.getState().equals("DONE")) {
-            return job;
-          }
-          // The job is not DONE, wait longer and retry.
-        } catch (IOException e) {
-          // ignore and retry
-          LOG.warn("Ignore the error and retry polling job status.", e);
-        }
-      } while (nextBackOff(sleeper, backoff));
-      LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
-      return null;
-    }
-
-    @Override
-    public JobStatistics dryRunQuery(String projectId, String query)
-        throws InterruptedException, IOException {
-      Job job = new Job()
-          .setConfiguration(new JobConfiguration()
-              .setQuery(new JobConfigurationQuery()
-                  .setQuery(query))
-              .setDryRun(true));
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      return executeWithRetries(
-          client.jobs().insert(projectId, job),
-          String.format(
-              "Unable to dry run query: %s, aborting after %d retries.",
-              query, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff).getStatistics();
-    }
-  }
-
-  @VisibleForTesting
-  static class DatasetServiceImpl implements DatasetService {
-    private final ApiErrorExtractor errorExtractor;
-    private final Bigquery client;
-
-    @VisibleForTesting
-    DatasetServiceImpl(Bigquery client) {
-      this.errorExtractor = new ApiErrorExtractor();
-      this.client = client;
-    }
-
-    private DatasetServiceImpl(BigQueryOptions bqOptions) {
-      this.errorExtractor = new ApiErrorExtractor();
-      this.client = Transport.newBigQueryClient(bqOptions).build();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public Table getTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      return executeWithRetries(
-          client.tables().get(projectId, datasetId, tableId),
-          String.format(
-              "Unable to get table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void deleteTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      executeWithRetries(
-          client.tables().delete(projectId, datasetId, tableId),
-          String.format(
-              "Unable to delete table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff);
-    }
-
-    @Override
-    public boolean isTableEmpty(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      TableDataList dataList = executeWithRetries(
-          client.tabledata().list(projectId, datasetId, tableId),
-          String.format(
-              "Unable to list table data: %s, aborting after %d retries.",
-              tableId, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff);
-      return dataList.getRows() == null || dataList.getRows().isEmpty();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public Dataset getDataset(String projectId, String datasetId)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      return executeWithRetries(
-          client.datasets().get(projectId, datasetId),
-          String.format(
-              "Unable to get dataset: %s, aborting after %d retries.",
-              datasetId, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void createDataset(
-        String projectId, String datasetId, String location, String description)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
-    }
-
-    @VisibleForTesting
-    void createDataset(
-        String projectId,
-        String datasetId,
-        String location,
-        String description,
-        Sleeper sleeper,
-        BackOff backoff) throws IOException, InterruptedException {
-      DatasetReference datasetRef = new DatasetReference()
-          .setProjectId(projectId)
-          .setDatasetId(datasetId);
-
-      Dataset dataset = new Dataset()
-          .setDatasetReference(datasetRef)
-          .setLocation(location)
-          .setFriendlyName(location)
-          .setDescription(description);
-
-      Exception lastException;
-      do {
-        try {
-          client.datasets().insert(projectId, dataset).execute();
-          return; // SUCCEEDED
-        } catch (GoogleJsonResponseException e) {
-          if (errorExtractor.itemAlreadyExists(e)) {
-            return; // SUCCEEDED
-          }
-          // ignore and retry
-          LOG.warn("Ignore the error and retry creating the dataset.", e);
-          lastException = e;
-        } catch (IOException e) {
-          LOG.warn("Ignore the error and retry creating the dataset.", e);
-          lastException = e;
-        }
-      } while (nextBackOff(sleeper, backoff));
-      throw new IOException(
-          String.format(
-              "Unable to create dataset: %s, aborting after %d .",
-              datasetId, MAX_RPC_ATTEMPTS),
-          lastException);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
-     *
-     * @throws IOException if it exceeds max RPC .
-     */
-    @Override
-    public void deleteDataset(String projectId, String datasetId)
-        throws IOException, InterruptedException {
-      BackOff backoff =
-          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
-      executeWithRetries(
-          client.datasets().delete(projectId, datasetId),
-          String.format(
-              "Unable to delete table: %s, aborting after %d retries.",
-              datasetId, MAX_RPC_ATTEMPTS),
-          Sleeper.DEFAULT,
-          backoff);
-    }
-  }
-
-  private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
-    BigQueryTableRowIterator iterator;
-
-    private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
-      this.iterator = iterator;
-    }
-
-    private static BigQueryJsonReader fromQuery(
-        BigQueryOptions bqOptions,
-        String query,
-        String projectId,
-        @Nullable Boolean flattenResults) {
-      return new BigQueryJsonReaderImpl(
-          BigQueryTableRowIterator.fromQuery(
-              query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
-    }
-
-    private static BigQueryJsonReader fromTable(
-        BigQueryOptions bqOptions,
-        TableReference tableRef) {
-      return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
-          tableRef, Transport.newBigQueryClient(bqOptions).build()));
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      try {
-        iterator.open();
-        return iterator.advance();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException("Interrupted during start() operation", e);
-      }
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      try {
-        return iterator.advance();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException("Interrupted during advance() operation", e);
-      }
-    }
-
-    @Override
-    public TableRow getCurrent() throws NoSuchElementException {
-      return iterator.getCurrent();
-    }
-
-    @Override
-    public void close() throws IOException {
-      iterator.close();
-    }
-  }
-
-  @VisibleForTesting
-  static <T> T executeWithRetries(
-      AbstractGoogleClientRequest<T> request,
-      String errorMessage,
-      Sleeper sleeper,
-      BackOff backoff)
-      throws IOException, InterruptedException {
-    Exception lastException = null;
-    do {
-      try {
-        return request.execute();
-      } catch (IOException e) {
-        LOG.warn("Ignore the error and retry the request.", e);
-        lastException = e;
-      }
-    } while (nextBackOff(sleeper, backoff));
-    throw new IOException(
-        errorMessage,
-        lastException);
-  }
-
-  /**
-   * Identical to {@link BackOffUtils#next} but without checked IOException.
-   * @throws InterruptedException
-   */
-  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
-    try {
-      return BackOffUtils.next(sleeper, backoff);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
deleted file mode 100644
index 84004a7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Inserts rows into BigQuery.
- */
-public class BigQueryTableInserter {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
-  // Approximate amount of table data to upload per InsertAll request.
-  private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
-  // The maximum number of rows to upload per InsertAll request.
-  private static final long MAX_ROWS_PER_BATCH = 500;
-
-  // The maximum number of times to retry inserting rows into BigQuery.
-  private static final int MAX_INSERT_ATTEMPTS = 5;
-
-  // The initial backoff after a failure inserting rows into BigQuery.
-  private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
-  // Backoff time bounds for rate limit exceeded errors.
-  private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
-  private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
-
-  private final Bigquery client;
-  private final TableReference defaultRef;
-  private final long maxRowsPerBatch;
-
-  private ExecutorService executor;
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param options a PipelineOptions object
-   */
-  public BigQueryTableInserter(Bigquery client, PipelineOptions options) {
-    this.client = client;
-    this.defaultRef = null;
-    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param options a PipelineOptions object
-   * @param defaultRef identifies the table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)}
-   */
-  @Deprecated
-  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
-                               TableReference defaultRef) {
-    this.client = client;
-    this.defaultRef = defaultRef;
-    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param options a PipelineOptions object
-   * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
-   */
-  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
-                               int maxRowsPerBatch) {
-    this.client = client;
-    this.defaultRef = null;
-    this.maxRowsPerBatch = maxRowsPerBatch;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param defaultRef identifies the default table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)}
-   */
-  @Deprecated
-  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
-                               TableReference defaultRef, int maxRowsPerBatch) {
-    this.client = client;
-    this.defaultRef = defaultRef;
-    this.maxRowsPerBatch = maxRowsPerBatch;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Insert all rows from the given list.
-   *
-   * @deprecated replaced by {@link #insertAll(TableReference, List)}
-   */
-  @Deprecated
-  public void insertAll(List<TableRow> rowList) throws IOException {
-    insertAll(defaultRef, rowList, null, null);
-  }
-
-  /**
-   * Insert all rows from the given list using specified insertIds if not null.
-   *
-   * @deprecated replaced by {@link #insertAll(TableReference, List, List)}
-   */
-  @Deprecated
-  public void insertAll(List<TableRow> rowList,
-      @Nullable List<String> insertIdList) throws IOException {
-    insertAll(defaultRef, rowList, insertIdList, null);
-  }
-
-  /**
-   * Insert all rows from the given list.
-   */
-  public void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
-    insertAll(ref, rowList, null, null);
-  }
-
-  /**
-   * Insert all rows from the given list using specified insertIds if not null. Track count of
-   * bytes written with the Aggregator.
-   */
-  public void insertAll(TableReference ref, List<TableRow> rowList,
-      @Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator)
-      throws IOException {
-    checkNotNull(ref, "ref");
-    if (insertIdList != null && rowList.size() != insertIdList.size()) {
-      throw new AssertionError("If insertIdList is not null it needs to have at least "
-          + "as many elements as rowList");
-    }
-
-    AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-        MAX_INSERT_ATTEMPTS,
-        INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
-    List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
-    // These lists contain the rows to publish. Initially the contain the entire list. If there are
-    // failures, they will contain only the failed rows to be retried.
-    List<TableRow> rowsToPublish = rowList;
-    List<String> idsToPublish = insertIdList;
-    while (true) {
-      List<TableRow> retryRows = new ArrayList<>();
-      List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
-
-      int strideIndex = 0;
-      // Upload in batches.
-      List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
-      int dataSize = 0;
-
-      List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
-      List<Integer> strideIndices = new ArrayList<>();
-
-      for (int i = 0; i < rowsToPublish.size(); ++i) {
-        TableRow row = rowsToPublish.get(i);
-        TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-        if (idsToPublish != null) {
-          out.setInsertId(idsToPublish.get(i));
-        }
-        out.setJson(row.getUnknownKeys());
-        rows.add(out);
-
-        dataSize += row.toString().length();
-        if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
-            || i == rowsToPublish.size() - 1) {
-          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-          content.setRows(rows);
-
-          final Bigquery.Tabledata.InsertAll insert = client.tabledata()
-              .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
-                  content);
-
-          futures.add(
-              executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
-                @Override
-                public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
-                  BackOff backoff = new IntervalBoundedExponentialBackOff(
-                      MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
-                  while (true) {
-                    try {
-                      return insert.execute().getInsertErrors();
-                    } catch (IOException e) {
-                      if (new ApiErrorExtractor().rateLimited(e)) {
-                        LOG.info("BigQuery insertAll exceeded rate limit, retrying");
-                        try {
-                          Thread.sleep(backoff.nextBackOffMillis());
-                        } catch (InterruptedException interrupted) {
-                          throw new IOException(
-                              "Interrupted while waiting before retrying insertAll");
-                        }
-                      } else {
-                        throw e;
-                      }
-                    }
-                  }
-                }
-              }));
-          strideIndices.add(strideIndex);
-
-          if (byteCountAggregator != null) {
-            byteCountAggregator.addValue(Long.valueOf(dataSize));
-          }
-          dataSize = 0;
-          strideIndex = i + 1;
-          rows = new LinkedList<>();
-        }
-      }
-
-      try {
-        for (int i = 0; i < futures.size(); i++) {
-          List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
-          if (errors != null) {
-            for (TableDataInsertAllResponse.InsertErrors error : errors) {
-              allErrors.add(error);
-              if (error.getIndex() == null) {
-                throw new IOException("Insert failed: " + allErrors);
-              }
-
-              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
-              retryRows.add(rowsToPublish.get(errorIndex));
-              if (retryIds != null) {
-                retryIds.add(idsToPublish.get(errorIndex));
-              }
-            }
-          }
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while inserting " + rowsToPublish);
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e.getCause());
-      }
-
-      if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
-        try {
-          Thread.sleep(backoff.nextBackOffMillis());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
-        }
-        LOG.info("Retrying failed inserts to BigQuery");
-        rowsToPublish = retryRows;
-        idsToPublish = retryIds;
-        allErrors.clear();
-      } else {
-        break;
-      }
-    }
-    if (!allErrors.isEmpty()) {
-      throw new IOException("Insert failed: " + allErrors);
-    }
-  }
-
-  /**
-   * Retrieves or creates the table.
-   *
-   * <p>The table is checked to conform to insertion requirements as specified
-   * by WriteDisposition and CreateDisposition.
-   *
-   * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
-   * this will re-create the table if necessary to ensure it is empty.
-   *
-   * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
-   * will fail if the table exists and is not empty.
-   *
-   * <p>When constructing a table, a {@code TableSchema} must be available.  If a
-   * schema is provided, then it will be used.  If no schema is provided, but
-   * an existing table is being cleared (WRITE_TRUNCATE option above), then
-   * the existing schema will be re-used.  If no schema is available, then an
-   * {@code IOException} is thrown.
-   */
-  public Table getOrCreateTable(
-      TableReference ref,
-      WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      @Nullable TableSchema schema) throws IOException {
-    // Check if table already exists.
-    Bigquery.Tables.Get get = client.tables()
-        .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    Table table = null;
-    try {
-      table = get.execute();
-    } catch (IOException e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if (!errorExtractor.itemNotFound(e)
-          || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
-        // Rethrow.
-        throw e;
-      }
-    }
-
-    // If we want an empty table, and it isn't, then delete it first.
-    if (table != null) {
-      if (writeDisposition == WriteDisposition.WRITE_APPEND) {
-        return table;
-      }
-
-      boolean empty = isEmpty(ref);
-      if (empty) {
-        if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
-          LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
-        }
-        return table;
-
-      } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
-        throw new IOException("WriteDisposition is WRITE_EMPTY, "
-            + "but table is not empty");
-      }
-
-      // Reuse the existing schema if none was provided.
-      if (schema == null) {
-        schema = table.getSchema();
-      }
-
-      // Delete table and fall through to re-creating it below.
-      LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
-      Bigquery.Tables.Delete delete = client.tables()
-          .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-      delete.execute();
-    }
-
-    if (schema == null) {
-      throw new IllegalArgumentException(
-          "Table schema required for new table.");
-    }
-
-    // Create the table.
-    return tryCreateTable(ref, schema);
-  }
-
-  /**
-   * Checks if a table is empty.
-   */
-  public boolean isEmpty(TableReference ref) throws IOException {
-    Bigquery.Tabledata.List list = client.tabledata()
-        .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    list.setMaxResults(1L);
-    TableDataList dataList = list.execute();
-
-    return dataList.getRows() == null || dataList.getRows().isEmpty();
-  }
-
-  /**
-   * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
-   * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
-   * configured with a table spec function to use different tables for each window.
-   */
-  private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
-  /**
-   * Tries to create the BigQuery table.
-   * If a table with the same name already exists in the dataset, the table
-   * creation fails, and the function returns null.  In such a case,
-   * the existing table doesn't necessarily have the same schema as specified
-   * by the parameter.
-   *
-   * @param schema Schema of the new BigQuery table.
-   * @return The newly created BigQuery table information, or null if the table
-   *     with the same name already exists.
-   * @throws IOException if other error than already existing table occurs.
-   */
-  @Nullable
-  public Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
-    LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
-    BackOff backoff =
-        new ExponentialBackOff.Builder()
-            .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
-            .build();
-
-    Table table = new Table().setTableReference(ref).setSchema(schema);
-    return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Table tryCreateTable(
-      Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
-          throws IOException {
-    boolean retry = false;
-    while (true) {
-      try {
-        return client.tables().insert(projectId, datasetId, table).execute();
-      } catch (IOException e) {
-        ApiErrorExtractor extractor = new ApiErrorExtractor();
-        if (extractor.itemAlreadyExists(e)) {
-          // The table already exists, nothing to return.
-          return null;
-        } else if (extractor.rateLimited(e)) {
-          // The request failed because we hit a temporary quota. Back off and try again.
-          try {
-            if (BackOffUtils.next(sleeper, backoff)) {
-              if (!retry) {
-                LOG.info(
-                    "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
-                    projectId,
-                    datasetId,
-                    table.getTableReference().getTableId(),
-                    TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
-                retry = true;
-              }
-              continue;
-            }
-          } catch (InterruptedException e1) {
-            // Restore interrupted state and throw the last failure.
-            Thread.currentThread().interrupt();
-            throw e;
-          }
-        }
-        throw e;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
deleted file mode 100644
index 3865654..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ClassInfo;
-import com.google.api.client.util.Data;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Iterates over all rows in a table.
- */
-public class BigQueryTableRowIterator implements AutoCloseable {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
-
-  @Nullable private TableReference ref;
-  @Nullable private final String projectId;
-  @Nullable private TableSchema schema;
-  private final Bigquery client;
-  private String pageToken;
-  private Iterator<TableRow> iteratorOverCurrentBatch;
-  private TableRow current;
-  // Set true when the final page is seen from the service.
-  private boolean lastPage = false;
-
-  // The maximum number of times a BigQuery request will be retried
-  private static final int MAX_RETRIES = 3;
-  // Initial wait time for the backoff implementation
-  private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
-
-  // After sending a query to BQ service we will be polling the BQ service to check the status with
-  // following interval to check the status of query execution job
-  private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
-
-  private final String query;
-  // Whether to flatten query results.
-  private final boolean flattenResults;
-  // Temporary dataset used to store query results.
-  private String temporaryDatasetId = null;
-  // Temporary table used to store query results.
-  private String temporaryTableId = null;
-
-  private BigQueryTableRowIterator(
-      @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
-      Bigquery client, boolean flattenResults) {
-    this.ref = ref;
-    this.query = query;
-    this.projectId = projectId;
-    this.client = checkNotNull(client, "client");
-    this.flattenResults = flattenResults;
-  }
-
-  /**
-   * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
-   */
-  public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
-    checkNotNull(ref, "ref");
-    checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
-  }
-
-  /**
-   * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
-   * specified query in the specified project.
-   */
-  public static BigQueryTableRowIterator fromQuery(
-      String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
-    checkNotNull(query, "query");
-    checkNotNull(projectId, "projectId");
-    checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(null, query, projectId, client,
-        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
-  }
-
-  /**
-   * Opens the table for read.
-   * @throws IOException on failure
-   */
-  public void open() throws IOException, InterruptedException {
-    if (query != null) {
-      ref = executeQueryAndWaitForCompletion();
-    }
-    // Get table schema.
-    Bigquery.Tables.Get get =
-        client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-
-    Table table =
-        executeWithBackOff(
-            get,
-            "Error opening BigQuery table  %s of dataset %s  : {}",
-            ref.getTableId(),
-            ref.getDatasetId());
-    schema = table.getSchema();
-  }
-
-  public boolean advance() throws IOException, InterruptedException {
-    while (true) {
-      if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
-        // Embed schema information into the raw row, so that values have an
-        // associated key.  This matches how rows are read when using the
-        // DataflowRunner.
-        current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
-        return true;
-      }
-      if (lastPage) {
-        return false;
-      }
-
-      Bigquery.Tabledata.List list =
-          client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-      if (pageToken != null) {
-        list.setPageToken(pageToken);
-      }
-
-      TableDataList result =
-          executeWithBackOff(
-              list,
-              "Error reading from BigQuery table %s of dataset %s : {}",
-              ref.getTableId(),
-              ref.getDatasetId());
-
-      pageToken = result.getPageToken();
-      iteratorOverCurrentBatch =
-          result.getRows() != null
-              ? result.getRows().iterator()
-              : Collections.<TableRow>emptyIterator();
-
-      // The server may return a page token indefinitely on a zero-length table.
-      if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
-        lastPage = true;
-      }
-    }
-  }
-
-  public TableRow getCurrent() {
-    if (current == null) {
-      throw new NoSuchElementException();
-    }
-    return current;
-  }
-
-  /**
-   * Adjusts a field returned from the BigQuery API to match what we will receive when running
-   * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
-   * used for batch jobs executed on the Cloud Dataflow service.
-   *
-   * <p>The following is the relationship between BigQuery schema and Java types:
-   *
-   * <ul>
-   *   <li>Nulls are {@code null}.
-   *   <li>Repeated fields are {@code List} of objects.
-   *   <li>Record columns are {@link TableRow} objects.
-   *   <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
-   *   <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
-   *   <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
-   *       {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
-   *       zeros and can be 1 to 6 digits long.
-   *   <li>Every other atomic type is a {@code String}.
-   * </ul>
-   *
-   * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
-   *
-   * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
-   * and are not accessible through the {@link TableRow#getF} function.
-   */
-  @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
-    if (Data.isNull(v)) {
-      return null;
-    }
-
-    if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
-      TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
-      @SuppressWarnings("unchecked")
-      List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
-      ImmutableList.Builder<Object> values = ImmutableList.builder();
-      for (Map<String, Object> element : rawCells) {
-        values.add(getTypedCellValue(elementSchema, element.get("v")));
-      }
-      return values.build();
-    }
-
-    if (fieldSchema.getType().equals("RECORD")) {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> typedV = (Map<String, Object>) v;
-      return getTypedTableRow(fieldSchema.getFields(), typedV);
-    }
-
-    if (fieldSchema.getType().equals("FLOAT")) {
-      return Double.parseDouble((String) v);
-    }
-
-    if (fieldSchema.getType().equals("BOOLEAN")) {
-      return Boolean.parseBoolean((String) v);
-    }
-
-    if (fieldSchema.getType().equals("TIMESTAMP")) {
-      return AvroUtils.formatTimestamp((String) v);
-    }
-
-    return v;
-  }
-
-  /**
-   * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
-   * because they are reserved keywords in {@link TableRow}.
-   */
-  // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
-  // not indirect through our broken use of {@link TableRow}.
-  //     See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
-  private static final Collection<String> RESERVED_FIELD_NAMES =
-      ClassInfo.of(TableRow.class).getNames();
-
-  /**
-   * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
-   * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
-   * the cells are converted to Java types according to the provided field schemas.
-   *
-   * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
-   * types are mapped to Java types.
-   */
-  private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
-    // If rawRow is a TableRow, use it. If not, create a new one.
-    TableRow row;
-    List<? extends Map<String, Object>> cells;
-    if (rawRow instanceof TableRow) {
-      // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
-      // any type conversion, but extract the cells for cell-wise processing below.
-      row = (TableRow) rawRow;
-      cells = row.getF();
-      // Clear the cells from the row, so that row.getF() will return null. This matches the
-      // behavior of rows produced by the BigQuery export API used on the service.
-      row.setF(null);
-    } else {
-      row = new TableRow();
-
-      // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
-      // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
-      // we will use Map.get("v") instead of TableCell.getV() get its value.
-      @SuppressWarnings("unchecked")
-      List<? extends Map<String, Object>> rawCells =
-          (List<? extends Map<String, Object>>) rawRow.get("f");
-      cells = rawCells;
-    }
-
-    checkState(cells.size() == fields.size(),
-        "Expected that the row has the same number of cells %s as fields in the schema %s",
-        cells.size(), fields.size());
-
-    // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
-    // and storing the normalized values by field name in the Map<String, Object> that
-    // underlies the TableRow.
-    Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
-    Iterator<TableFieldSchema> fieldIt = fields.iterator();
-    while (cellIt.hasNext()) {
-      Map<String, Object> cell = cellIt.next();
-      TableFieldSchema fieldSchema = fieldIt.next();
-
-      // Convert the object in this cell to the Java type corresponding to its type in the schema.
-      Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
-
-      String fieldName = fieldSchema.getName();
-      checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
-          "BigQueryIO does not support records with columns named %s", fieldName);
-
-      if (convertedValue == null) {
-        // BigQuery does not include null values when the export operation (to JSON) is used.
-        // To match that behavior, BigQueryTableRowiterator, and the DirectRunner,
-        // intentionally omits columns with null values.
-        continue;
-      }
-
-      row.set(fieldName, convertedValue);
-    }
-    return row;
-  }
-
-  // Create a new BigQuery dataset
-  private void createDataset(String datasetId) throws IOException, InterruptedException {
-    Dataset dataset = new Dataset();
-    DatasetReference reference = new DatasetReference();
-    reference.setProjectId(projectId);
-    reference.setDatasetId(datasetId);
-    dataset.setDatasetReference(reference);
-
-    String createDatasetError =
-        "Error when trying to create the temporary dataset " + datasetId + " in project "
-        + projectId;
-    executeWithBackOff(
-        client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
-  }
-
-  // Delete the given table that is available in the given dataset.
-  private void deleteTable(String datasetId, String tableId)
-      throws IOException, InterruptedException {
-    executeWithBackOff(
-        client.tables().delete(projectId, datasetId, tableId),
-        "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
-        + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
-  }
-
-  // Delete the given dataset. This will fail if the given dataset has any tables.
-  private void deleteDataset(String datasetId) throws IOException, InterruptedException {
-    executeWithBackOff(
-        client.datasets().delete(projectId, datasetId),
-        "Error when trying to delete the temporary dataset " + datasetId + " in project "
-        + projectId + ". Manual deletion may be required. Error message : {}");
-  }
-
-  /**
-   * Executes the specified query and returns a reference to the temporary BigQuery table created
-   * to hold the results.
-   *
-   * @throws IOException if the query fails.
-   */
-  private TableReference executeQueryAndWaitForCompletion()
-      throws IOException, InterruptedException {
-    // Create a temporary dataset to store results.
-    // Starting dataset name with an "_" so that it is hidden.
-    Random rnd = new Random(System.currentTimeMillis());
-    temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
-    temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
-
-    createDataset(temporaryDatasetId);
-    Job job = new Job();
-    JobConfiguration config = new JobConfiguration();
-    JobConfigurationQuery queryConfig = new JobConfigurationQuery();
-    config.setQuery(queryConfig);
-    job.setConfiguration(config);
-    queryConfig.setQuery(query);
-    queryConfig.setAllowLargeResults(true);
-    queryConfig.setFlattenResults(flattenResults);
-
-    TableReference destinationTable = new TableReference();
-    destinationTable.setProjectId(projectId);
-    destinationTable.setDatasetId(temporaryDatasetId);
-    destinationTable.setTableId(temporaryTableId);
-    queryConfig.setDestinationTable(destinationTable);
-
-    Insert insert = client.jobs().insert(projectId, job);
-    Job queryJob = executeWithBackOff(
-        insert, "Error when trying to execute the job for query " + query + " :{}");
-    JobReference jobId = queryJob.getJobReference();
-
-    while (true) {
-      Job pollJob = executeWithBackOff(
-          client.jobs().get(projectId, jobId.getJobId()),
-          "Error when trying to get status of the job for query " + query + " :{}");
-      JobStatus status = pollJob.getStatus();
-      if (status.getState().equals("DONE")) {
-        // Job is DONE, but did not necessarily succeed.
-        ErrorProto error = status.getErrorResult();
-        if (error == null) {
-          return pollJob.getConfiguration().getQuery().getDestinationTable();
-        } else {
-          // There will be no temporary table to delete, so null out the reference.
-          temporaryTableId = null;
-          throw new IOException("Executing query " + query + " failed: " + error.getMessage());
-        }
-      }
-      Uninterruptibles.sleepUninterruptibly(
-          QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
-    }
-  }
-
-  // Execute a BQ request with exponential backoff and return the result.
-  // client - BQ request to be executed
-  // error - Formatted message to log if when a request fails. Takes exception message as a
-  // formatter parameter.
-  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
-      Object... errorArgs) throws IOException, InterruptedException {
-    Sleeper sleeper = Sleeper.DEFAULT;
-    BackOff backOff =
-        new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
-
-    T result = null;
-    while (true) {
-      try {
-        result = client.execute();
-        break;
-      } catch (IOException e) {
-        LOG.error(String.format(error, errorArgs), e.getMessage());
-        if (!BackOffUtils.next(sleeper, backOff)) {
-          LOG.error(
-              String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
-          throw e;
-        }
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public void close() {
-    // Prevent any further requests.
-    lastPage = true;
-
-    try {
-      // Deleting temporary table and dataset that gets generated when executing a query.
-      if (temporaryDatasetId != null) {
-        if (temporaryTableId != null) {
-          deleteTable(temporaryDatasetId, temporaryTableId);
-        }
-        deleteDataset(temporaryDatasetId);
-      }
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new RuntimeException(e);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index a616f5a..ca3f0ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -19,13 +19,12 @@ package org.apache.beam.sdk.values;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -40,19 +39,15 @@ import org.apache.beam.sdk.util.WindowingStrategy;
  * be passed as the inputs of other PTransforms.
  *
  * <p>Some root transforms produce bounded {@code PCollections} and others
- * produce unbounded ones.  For example, {@link TextIO.Read} reads a static set
- * of files, so it produces a bounded {@link PCollection}.
- * {@link PubsubIO.Read}, on the other hand, receives a potentially infinite stream
- * of Pubsub messages, so it produces an unbounded {@link PCollection}.
+ * produce unbounded ones. For example, {@link CountingInput#upTo} produces a fixed set of integers,
+ * so it produces a bounded {@link PCollection}. {@link CountingInput#unbounded} produces all
+ * integers as an infinite stream, so it produces an unbounded {@link PCollection}.
  *
- * <p>Each element in a {@link PCollection} may have an associated implicit
- * timestamp.  Readers assign timestamps to elements when they create
- * {@link PCollection PCollections}, and other {@link PTransform PTransforms} propagate these
- * timestamps from their input to their output. For example, {@link PubsubIO.Read}
- * assigns pubsub message timestamps to elements, and {@link TextIO.Read} assigns
- * the default value {@link BoundedWindow#TIMESTAMP_MIN_VALUE} to elements. User code can
- * explicitly assign timestamps to elements with
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#outputWithTimestamp}.
+ * <p>Each element in a {@link PCollection} has an associated timestamp. Readers assign timestamps
+ * to elements when they create {@link PCollection PCollections}, and other
+ * {@link PTransform PTransforms} propagate these timestamps from their input to their output. See
+ * the documentation on {@link BoundedReader} and {@link UnboundedReader} for more information on
+ * how these readers produce timestamps and watermarks.
  *
  * <p>Additionally, a {@link PCollection} has an associated
  * {@link WindowFn} and each element is assigned to a set of windows.
@@ -73,14 +68,11 @@ public class PCollection<T> extends TypedPValue<T> {
    */
   public enum IsBounded {
     /**
-     * Indicates that a {@link PCollection} contains bounded data elements, such as
-     * {@link PCollection PCollections} from {@link TextIO}, {@link BigQueryIO},
-     * {@link Create} e.t.c.
+     * Indicates that a {@link PCollection} contains a bounded number of elements.
      */
     BOUNDED,
     /**
-     * Indicates that a {@link PCollection} contains unbounded data elements, such as
-     * {@link PCollection PCollections} from {@link PubsubIO}.
+     * Indicates that a {@link PCollection} contains an unbounded number of elements.
      */
     UNBOUNDED;