You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:38 UTC

[14/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
deleted file mode 100644
index b6a1493..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * TimerInternals that uses priority queues to manage the timers that are ready to fire.
- */
-public class BatchTimerInternals implements TimerInternals {
-  /** Set of timers that are scheduled used for deduplicating timers. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  // Keep these queues separate so we can advance over them separately.
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  private Instant inputWatermarkTime;
-  private Instant processingTime;
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
-  }
-
-  public BatchTimerInternals(Instant processingTime) {
-    this.processingTime = processingTime;
-    this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-  }
-
-  @Override
-  public void setTimer(TimerData timer) {
-    if (existingTimers.add(timer)) {
-      queue(timer.getDomain()).add(timer);
-    }
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
-   * is already complete.
-   */
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    // The output watermark is always undefined in batch mode.
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .toString();
-  }
-
-  public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
-      throws Exception {
-    Preconditions.checkState(!newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
-        newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-    advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
-  }
-
-  public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
-      throws Exception {
-    Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
-    processingTime = newProcessingTime;
-    advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
-  }
-
-  private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
-      throws Exception {
-    PriorityQueue<TimerData> timers = queue(domain);
-    boolean shouldFire = false;
-
-    do {
-      TimerData timer = timers.peek();
-      // Timers fire if the new time is ahead of the timer
-      shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
-      if (shouldFire) {
-        // Remove before firing, so that if the trigger adds another identical
-        // timer we don't remove it.
-        timers.remove();
-        runner.onTimer(timer);
-      }
-    } while (shouldFire);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
deleted file mode 100644
index cd51062..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Inserts rows into BigQuery.
- */
-public class BigQueryTableInserter {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
-  // Approximate amount of table data to upload per InsertAll request.
-  private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
-  // The maximum number of rows to upload per InsertAll request.
-  private static final long MAX_ROWS_PER_BATCH = 500;
-
-  // The maximum number of times to retry inserting rows into BigQuery.
-  private static final int MAX_INSERT_ATTEMPTS = 5;
-
-  // The initial backoff after a failure inserting rows into BigQuery.
-  private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
-  private final Bigquery client;
-  private final TableReference defaultRef;
-  private final long maxRowsPerBatch;
-
-  private static final ExecutorService executor = MoreExecutors.getExitingExecutorService(
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS);
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   */
-  public BigQueryTableInserter(Bigquery client) {
-    this.client = client;
-    this.defaultRef = null;
-    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param defaultRef identifies the table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)}
-   */
-  @Deprecated
-  public BigQueryTableInserter(Bigquery client, TableReference defaultRef) {
-    this.client = client;
-    this.defaultRef = defaultRef;
-    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   */
-  public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) {
-    this.client = client;
-    this.defaultRef = null;
-    this.maxRowsPerBatch = maxRowsPerBatch;
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param defaultRef identifies the default table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)}
-   */
-  @Deprecated
-  public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) {
-    this.client = client;
-    this.defaultRef = defaultRef;
-    this.maxRowsPerBatch = maxRowsPerBatch;
-  }
-
-  /**
-   * Insert all rows from the given list.
-   *
-   * @deprecated replaced by {@link #insertAll(TableReference, List)}
-   */
-  @Deprecated
-  public void insertAll(List<TableRow> rowList) throws IOException {
-    insertAll(defaultRef, rowList, null, null);
-  }
-
-  /**
-   * Insert all rows from the given list using specified insertIds if not null.
-   *
-   * @deprecated replaced by {@link #insertAll(TableReference, List, List)}
-   */
-  @Deprecated
-  public void insertAll(List<TableRow> rowList,
-      @Nullable List<String> insertIdList) throws IOException {
-    insertAll(defaultRef, rowList, insertIdList, null);
-  }
-
-  /**
-   * Insert all rows from the given list.
-   */
-  public void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
-    insertAll(ref, rowList, null, null);
-  }
-
-  /**
-   * Insert all rows from the given list using specified insertIds if not null. Track count of
-   * bytes written with the Aggregator.
-   */
-  public void insertAll(TableReference ref, List<TableRow> rowList,
-      @Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator)
-      throws IOException {
-    Preconditions.checkNotNull(ref, "ref");
-    if (insertIdList != null && rowList.size() != insertIdList.size()) {
-      throw new AssertionError("If insertIdList is not null it needs to have at least "
-          + "as many elements as rowList");
-    }
-
-    AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-        MAX_INSERT_ATTEMPTS,
-        INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
-    List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
-    // These lists contain the rows to publish. Initially the contain the entire list. If there are
-    // failures, they will contain only the failed rows to be retried.
-    List<TableRow> rowsToPublish = rowList;
-    List<String> idsToPublish = insertIdList;
-    while (true) {
-      List<TableRow> retryRows = new ArrayList<>();
-      List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
-
-      int strideIndex = 0;
-      // Upload in batches.
-      List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
-      int dataSize = 0;
-
-      List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
-      List<Integer> strideIndices = new ArrayList<>();
-
-      for (int i = 0; i < rowsToPublish.size(); ++i) {
-        TableRow row = rowsToPublish.get(i);
-        TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-        if (idsToPublish != null) {
-          out.setInsertId(idsToPublish.get(i));
-        }
-        out.setJson(row.getUnknownKeys());
-        rows.add(out);
-
-        dataSize += row.toString().length();
-        if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch ||
-            i == rowsToPublish.size() - 1) {
-          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-          content.setRows(rows);
-
-          final Bigquery.Tabledata.InsertAll insert = client.tabledata()
-              .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
-                  content);
-
-          futures.add(
-              executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
-                @Override
-                public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
-                  return insert.execute().getInsertErrors();
-                }
-              }));
-          strideIndices.add(strideIndex);
-
-          if (byteCountAggregator != null) {
-            byteCountAggregator.addValue(Long.valueOf(dataSize));
-          }
-          dataSize = 0;
-          strideIndex = i + 1;
-          rows = new LinkedList<>();
-        }
-      }
-
-      try {
-        for (int i = 0; i < futures.size(); i++) {
-          List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
-          if (errors != null) {
-            for (TableDataInsertAllResponse.InsertErrors error : errors) {
-              allErrors.add(error);
-              if (error.getIndex() == null) {
-                throw new IOException("Insert failed: " + allErrors);
-              }
-
-              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
-              retryRows.add(rowsToPublish.get(errorIndex));
-              if (retryIds != null) {
-                retryIds.add(idsToPublish.get(errorIndex));
-              }
-            }
-          }
-        }
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while inserting " + rowsToPublish);
-      } catch (ExecutionException e) {
-        Throwables.propagate(e.getCause());
-      }
-
-      if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
-        try {
-          Thread.sleep(backoff.nextBackOffMillis());
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
-        }
-        LOG.info("Retrying failed inserts to BigQuery");
-        rowsToPublish = retryRows;
-        idsToPublish = retryIds;
-        allErrors.clear();
-      } else {
-        break;
-      }
-    }
-    if (!allErrors.isEmpty()) {
-      throw new IOException("Insert failed: " + allErrors);
-    }
-  }
-
-  /**
-   * Retrieves or creates the table.
-   *
-   * <p>The table is checked to conform to insertion requirements as specified
-   * by WriteDisposition and CreateDisposition.
-   *
-   * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
-   * this will re-create the table if necessary to ensure it is empty.
-   *
-   * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
-   * will fail if the table exists and is not empty.
-   *
-   * <p>When constructing a table, a {@code TableSchema} must be available.  If a
-   * schema is provided, then it will be used.  If no schema is provided, but
-   * an existing table is being cleared (WRITE_TRUNCATE option above), then
-   * the existing schema will be re-used.  If no schema is available, then an
-   * {@code IOException} is thrown.
-   */
-  public Table getOrCreateTable(
-      TableReference ref,
-      WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      @Nullable TableSchema schema) throws IOException {
-    // Check if table already exists.
-    Bigquery.Tables.Get get = client.tables()
-        .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    Table table = null;
-    try {
-      table = get.execute();
-    } catch (IOException e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if (!errorExtractor.itemNotFound(e) ||
-          createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
-        // Rethrow.
-        throw e;
-      }
-    }
-
-    // If we want an empty table, and it isn't, then delete it first.
-    if (table != null) {
-      if (writeDisposition == WriteDisposition.WRITE_APPEND) {
-        return table;
-      }
-
-      boolean empty = isEmpty(ref);
-      if (empty) {
-        if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
-          LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
-        }
-        return table;
-
-      } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
-        throw new IOException("WriteDisposition is WRITE_EMPTY, "
-            + "but table is not empty");
-      }
-
-      // Reuse the existing schema if none was provided.
-      if (schema == null) {
-        schema = table.getSchema();
-      }
-
-      // Delete table and fall through to re-creating it below.
-      LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
-      Bigquery.Tables.Delete delete = client.tables()
-          .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-      delete.execute();
-    }
-
-    if (schema == null) {
-      throw new IllegalArgumentException(
-          "Table schema required for new table.");
-    }
-
-    // Create the table.
-    return tryCreateTable(ref, schema);
-  }
-
-  /**
-   * Checks if a table is empty.
-   */
-  public boolean isEmpty(TableReference ref) throws IOException {
-    Bigquery.Tabledata.List list = client.tabledata()
-        .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    list.setMaxResults(1L);
-    TableDataList dataList = list.execute();
-
-    return dataList.getRows() == null || dataList.getRows().isEmpty();
-  }
-
-  /**
-   * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
-   * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
-   * configured with a table spec function to use different tables for each window.
-   */
-  private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
-  /**
-   * Tries to create the BigQuery table.
-   * If a table with the same name already exists in the dataset, the table
-   * creation fails, and the function returns null.  In such a case,
-   * the existing table doesn't necessarily have the same schema as specified
-   * by the parameter.
-   *
-   * @param schema Schema of the new BigQuery table.
-   * @return The newly created BigQuery table information, or null if the table
-   *     with the same name already exists.
-   * @throws IOException if other error than already existing table occurs.
-   */
-  @Nullable
-  public Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
-    LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
-    BackOff backoff =
-        new ExponentialBackOff.Builder()
-            .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
-            .build();
-
-    Table table = new Table().setTableReference(ref).setSchema(schema);
-    return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Table tryCreateTable(
-      Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
-          throws IOException {
-    boolean retry = false;
-    while (true) {
-      try {
-        return client.tables().insert(projectId, datasetId, table).execute();
-      } catch (IOException e) {
-        ApiErrorExtractor extractor = new ApiErrorExtractor();
-        if (extractor.itemAlreadyExists(e)) {
-          // The table already exists, nothing to return.
-          return null;
-        } else if (extractor.rateLimited(e)) {
-          // The request failed because we hit a temporary quota. Back off and try again.
-          try {
-            if (BackOffUtils.next(sleeper, backoff)) {
-              if (!retry) {
-                LOG.info(
-                    "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
-                    projectId,
-                    datasetId,
-                    table.getTableReference().getTableId(),
-                    TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
-                retry = true;
-              }
-              continue;
-            }
-          } catch (InterruptedException e1) {
-            // Restore interrupted state and throw the last failure.
-            Thread.currentThread().interrupt();
-            throw e;
-          }
-        }
-        throw e;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
deleted file mode 100644
index c2c80f7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ClassInfo;
-import com.google.api.client.util.Data;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Random;
-
-import javax.annotation.Nullable;
-
-/**
- * Iterates over all rows in a table.
- */
-public class BigQueryTableRowIterator implements AutoCloseable {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
-
-  @Nullable private TableReference ref;
-  @Nullable private final String projectId;
-  @Nullable private TableSchema schema;
-  private final Bigquery client;
-  private String pageToken;
-  private Iterator<TableRow> iteratorOverCurrentBatch;
-  private TableRow current;
-  // Set true when the final page is seen from the service.
-  private boolean lastPage = false;
-
-  // The maximum number of times a BigQuery request will be retried
-  private static final int MAX_RETRIES = 3;
-  // Initial wait time for the backoff implementation
-  private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
-
-  // After sending a query to BQ service we will be polling the BQ service to check the status with
-  // following interval to check the status of query execution job
-  private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
-
-  private final String query;
-  // Whether to flatten query results.
-  private final boolean flattenResults;
-  // Temporary dataset used to store query results.
-  private String temporaryDatasetId = null;
-  // Temporary table used to store query results.
-  private String temporaryTableId = null;
-
-  private BigQueryTableRowIterator(
-      @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
-      Bigquery client, boolean flattenResults) {
-    this.ref = ref;
-    this.query = query;
-    this.projectId = projectId;
-    this.client = checkNotNull(client, "client");
-    this.flattenResults = flattenResults;
-  }
-
-  /**
-   * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
-   */
-  public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
-    checkNotNull(ref, "ref");
-    checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
-  }
-
-  /**
-   * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
-   * specified query in the specified project.
-   */
-  public static BigQueryTableRowIterator fromQuery(
-      String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
-    checkNotNull(query, "query");
-    checkNotNull(projectId, "projectId");
-    checkNotNull(client, "client");
-    return new BigQueryTableRowIterator(null, query, projectId, client,
-        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
-  }
-
-  /**
-   * Opens the table for read.
-   * @throws IOException on failure
-   */
-  public void open() throws IOException, InterruptedException {
-    if (query != null) {
-      ref = executeQueryAndWaitForCompletion();
-    }
-    // Get table schema.
-    Bigquery.Tables.Get get =
-        client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-
-    Table table =
-        executeWithBackOff(
-            get,
-            "Error opening BigQuery table  %s of dataset %s  : {}",
-            ref.getTableId(),
-            ref.getDatasetId());
-    schema = table.getSchema();
-  }
-
-  public boolean advance() throws IOException, InterruptedException {
-    while (true) {
-      if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
-        // Embed schema information into the raw row, so that values have an
-        // associated key.  This matches how rows are read when using the
-        // DataflowPipelineRunner.
-        current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
-        return true;
-      }
-      if (lastPage) {
-        return false;
-      }
-
-      Bigquery.Tabledata.List list =
-          client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-      if (pageToken != null) {
-        list.setPageToken(pageToken);
-      }
-
-      TableDataList result =
-          executeWithBackOff(
-              list,
-              "Error reading from BigQuery table %s of dataset %s : {}",
-              ref.getTableId(),
-              ref.getDatasetId());
-
-      pageToken = result.getPageToken();
-      iteratorOverCurrentBatch =
-          result.getRows() != null
-              ? result.getRows().iterator()
-              : Collections.<TableRow>emptyIterator();
-
-      // The server may return a page token indefinitely on a zero-length table.
-      if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
-        lastPage = true;
-      }
-    }
-  }
-
-  public TableRow getCurrent() {
-    if (current == null) {
-      throw new NoSuchElementException();
-    }
-    return current;
-  }
-
-  /**
-   * Adjusts a field returned from the BigQuery API to match what we will receive when running
-   * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
-   * used for batch jobs executed on the Cloud Dataflow service.
-   *
-   * <p>The following is the relationship between BigQuery schema and Java types:
-   *
-   * <ul>
-   *   <li>Nulls are {@code null}.
-   *   <li>Repeated fields are {@code List} of objects.
-   *   <li>Record columns are {@link TableRow} objects.
-   *   <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
-   *   <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
-   *   <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
-   *       {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
-   *       zeros and can be 1 to 6 digits long.
-   *   <li>Every other atomic type is a {@code String}.
-   * </ul>
-   *
-   * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
-   *
-   * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
-   * and are not accessible through the {@link TableRow#getF} function.
-   */
-  @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
-    if (Data.isNull(v)) {
-      return null;
-    }
-
-    if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
-      TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
-      @SuppressWarnings("unchecked")
-      List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
-      ImmutableList.Builder<Object> values = ImmutableList.builder();
-      for (Map<String, Object> element : rawCells) {
-        values.add(getTypedCellValue(elementSchema, element.get("v")));
-      }
-      return values.build();
-    }
-
-    if (fieldSchema.getType().equals("RECORD")) {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> typedV = (Map<String, Object>) v;
-      return getTypedTableRow(fieldSchema.getFields(), typedV);
-    }
-
-    if (fieldSchema.getType().equals("FLOAT")) {
-      return Double.parseDouble((String) v);
-    }
-
-    if (fieldSchema.getType().equals("BOOLEAN")) {
-      return Boolean.parseBoolean((String) v);
-    }
-
-    if (fieldSchema.getType().equals("TIMESTAMP")) {
-      return AvroUtils.formatTimestamp((String) v);
-    }
-
-    return v;
-  }
-
-  /**
-   * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
-   * because they are reserved keywords in {@link TableRow}.
-   */
-  // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
-  // not indirect through our broken use of {@link TableRow}.
-  //     See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
-  private static final Collection<String> RESERVED_FIELD_NAMES =
-      ClassInfo.of(TableRow.class).getNames();
-
-  /**
-   * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
-   * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
-   * the cells are converted to Java types according to the provided field schemas.
-   *
-   * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
-   * types are mapped to Java types.
-   */
-  private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
-    // If rawRow is a TableRow, use it. If not, create a new one.
-    TableRow row;
-    List<? extends Map<String, Object>> cells;
-    if (rawRow instanceof TableRow) {
-      // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
-      // any type conversion, but extract the cells for cell-wise processing below.
-      row = (TableRow) rawRow;
-      cells = row.getF();
-      // Clear the cells from the row, so that row.getF() will return null. This matches the
-      // behavior of rows produced by the BigQuery export API used on the service.
-      row.setF(null);
-    } else {
-      row = new TableRow();
-
-      // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
-      // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
-      // we will use Map.get("v") instead of TableCell.getV() get its value.
-      @SuppressWarnings("unchecked")
-      List<? extends Map<String, Object>> rawCells =
-          (List<? extends Map<String, Object>>) rawRow.get("f");
-      cells = rawCells;
-    }
-
-    checkState(cells.size() == fields.size(),
-        "Expected that the row has the same number of cells %s as fields in the schema %s",
-        cells.size(), fields.size());
-
-    // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
-    // and storing the normalized values by field name in the Map<String, Object> that
-    // underlies the TableRow.
-    Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
-    Iterator<TableFieldSchema> fieldIt = fields.iterator();
-    while (cellIt.hasNext()) {
-      Map<String, Object> cell = cellIt.next();
-      TableFieldSchema fieldSchema = fieldIt.next();
-
-      // Convert the object in this cell to the Java type corresponding to its type in the schema.
-      Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
-
-      String fieldName = fieldSchema.getName();
-      checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
-          "BigQueryIO does not support records with columns named %s", fieldName);
-
-      if (convertedValue == null) {
-        // BigQuery does not include null values when the export operation (to JSON) is used.
-        // To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner,
-        // intentionally omits columns with null values.
-        continue;
-      }
-
-      row.set(fieldName, convertedValue);
-    }
-    return row;
-  }
-
-  // Create a new BigQuery dataset
-  private void createDataset(String datasetId) throws IOException, InterruptedException {
-    Dataset dataset = new Dataset();
-    DatasetReference reference = new DatasetReference();
-    reference.setProjectId(projectId);
-    reference.setDatasetId(datasetId);
-    dataset.setDatasetReference(reference);
-
-    String createDatasetError =
-        "Error when trying to create the temporary dataset " + datasetId + " in project "
-        + projectId;
-    executeWithBackOff(
-        client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
-  }
-
-  // Delete the given table that is available in the given dataset.
-  private void deleteTable(String datasetId, String tableId)
-      throws IOException, InterruptedException {
-    executeWithBackOff(
-        client.tables().delete(projectId, datasetId, tableId),
-        "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
-        + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
-  }
-
-  // Delete the given dataset. This will fail if the given dataset has any tables.
-  private void deleteDataset(String datasetId) throws IOException, InterruptedException {
-    executeWithBackOff(
-        client.datasets().delete(projectId, datasetId),
-        "Error when trying to delete the temporary dataset " + datasetId + " in project "
-        + projectId + ". Manual deletion may be required. Error message : {}");
-  }
-
-  /**
-   * Executes the specified query and returns a reference to the temporary BigQuery table created
-   * to hold the results.
-   *
-   * @throws IOException if the query fails.
-   */
-  private TableReference executeQueryAndWaitForCompletion()
-      throws IOException, InterruptedException {
-    // Create a temporary dataset to store results.
-    // Starting dataset name with an "_" so that it is hidden.
-    Random rnd = new Random(System.currentTimeMillis());
-    temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
-    temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
-
-    createDataset(temporaryDatasetId);
-    Job job = new Job();
-    JobConfiguration config = new JobConfiguration();
-    JobConfigurationQuery queryConfig = new JobConfigurationQuery();
-    config.setQuery(queryConfig);
-    job.setConfiguration(config);
-    queryConfig.setQuery(query);
-    queryConfig.setAllowLargeResults(true);
-    queryConfig.setFlattenResults(flattenResults);
-
-    TableReference destinationTable = new TableReference();
-    destinationTable.setProjectId(projectId);
-    destinationTable.setDatasetId(temporaryDatasetId);
-    destinationTable.setTableId(temporaryTableId);
-    queryConfig.setDestinationTable(destinationTable);
-
-    Insert insert = client.jobs().insert(projectId, job);
-    Job queryJob = executeWithBackOff(
-        insert, "Error when trying to execute the job for query " + query + " :{}");
-    JobReference jobId = queryJob.getJobReference();
-
-    while (true) {
-      Job pollJob = executeWithBackOff(
-          client.jobs().get(projectId, jobId.getJobId()),
-          "Error when trying to get status of the job for query " + query + " :{}");
-      JobStatus status = pollJob.getStatus();
-      if (status.getState().equals("DONE")) {
-        // Job is DONE, but did not necessarily succeed.
-        ErrorProto error = status.getErrorResult();
-        if (error == null) {
-          return pollJob.getConfiguration().getQuery().getDestinationTable();
-        } else {
-          // There will be no temporary table to delete, so null out the reference.
-          temporaryTableId = null;
-          throw new IOException("Executing query " + query + " failed: " + error.getMessage());
-        }
-      }
-      try {
-        Thread.sleep(QUERY_COMPLETION_POLL_TIME.getMillis());
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  // Execute a BQ request with exponential backoff and return the result.
-  // client - BQ request to be executed
-  // error - Formatted message to log if when a request fails. Takes exception message as a
-  // formatter parameter.
-  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
-      Object... errorArgs) throws IOException, InterruptedException {
-    Sleeper sleeper = Sleeper.DEFAULT;
-    BackOff backOff =
-        new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
-
-    T result = null;
-    while (true) {
-      try {
-        result = client.execute();
-        break;
-      } catch (IOException e) {
-        LOG.error(String.format(error, errorArgs), e.getMessage());
-        if (!BackOffUtils.next(sleeper, backOff)) {
-          LOG.error(
-              String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
-          throw e;
-        }
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public void close() {
-    // Prevent any further requests.
-    lastPage = true;
-
-    try {
-      // Deleting temporary table and dataset that gets generated when executing a query.
-      if (temporaryDatasetId != null) {
-        if (temporaryTableId != null) {
-          deleteTable(temporaryDatasetId, temporaryTableId);
-        }
-        deleteDataset(temporaryDatasetId);
-      }
-    } catch (IOException | InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
deleted file mode 100644
index f3a039a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.BitSet;
-
-/**
- * Coder for the BitSet used to track child-trigger finished states.
- */
-class BitSetCoder extends AtomicCoder<BitSet> {
-
-  private static final BitSetCoder INSTANCE = new BitSetCoder();
-  private transient ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
-
-  private BitSetCoder() {}
-
-  public static BitSetCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(BitSet value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    byteArrayCoder.encodeAndOwn(value.toByteArray(), outStream, context);
-  }
-
-  @Override
-  public BitSet decode(InputStream inStream, Context context)
-      throws CoderException, IOException {
-    return BitSet.valueOf(byteArrayCoder.decode(inStream, context));
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "BitSetCoder requires its byteArrayCoder to be deterministic.",
-        byteArrayCoder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
deleted file mode 100644
index e8e693a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * Provides an efficient encoding for {@link Iterable}s containing small values by
- * buffering up to {@code bufferSize} bytes of data before prefixing the count.
- * Note that each element needs to be encoded in a nested context. See
- * {@link Context Coder.Context} for more details.
- *
- * <p>To use this stream:
- * <pre><code>
- * BufferedElementCountingOutputStream os = ...
- * for (Element E : elements) {
- *   os.markElementStart();
- *   // write an element to os
- * }
- * os.finish();
- * </code></pre>
- *
- * <p>The resulting output stream is:
- * <pre>
- * countA element(0) element(1) ... element(countA - 1)
- * countB element(0) element(1) ... element(countB - 1)
- * ...
- * countX element(0) element(1) ... element(countX - 1)
- * countY
- * </pre>
- *
- * <p>To read this stream:
- * <pre><code>
- * InputStream is = ...
- * long count;
- * do {
- *   count = VarInt.decodeLong(is);
- *   for (int i = 0; i < count; ++i) {
- *     // read an element from is
- *   }
- * } while(count > 0);
- * </code></pre>
- *
- * <p>The counts are encoded as variable length longs. See {@link VarInt#encode(long, OutputStream)}
- * for more details. The end of the iterable is detected by reading a count of 0.
- */
-@NotThreadSafe
-public class BufferedElementCountingOutputStream extends OutputStream {
-  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-  private final ByteBuffer buffer;
-  private final OutputStream os;
-  private boolean finished;
-  private long count;
-
-  /**
-   * Creates an output stream which encodes the number of elements output to it in a streaming
-   * manner.
-   */
-  public BufferedElementCountingOutputStream(OutputStream os) {
-    this(os, DEFAULT_BUFFER_SIZE);
-  }
-
-  /**
-   * Creates an output stream which encodes the number of elements output to it in a streaming
-   * manner with the given {@code bufferSize}.
-   */
-  BufferedElementCountingOutputStream(OutputStream os, int bufferSize) {
-    this.buffer = ByteBuffer.allocate(bufferSize);
-    this.os = os;
-    this.finished = false;
-    this.count = 0;
-  }
-
-  /**
-   * Finishes the encoding by flushing any buffered data,
-   * and outputting a final count of 0.
-   */
-  public void finish() throws IOException {
-    if (finished) {
-      return;
-    }
-    flush();
-    // Finish the stream by stating that there are 0 elements that follow.
-    VarInt.encode(0, os);
-    finished = true;
-  }
-
-  /**
-   * Marks that a new element is being output. This allows this output stream
-   * to use the buffer if it had previously overflowed marking the start of a new
-   * block of elements.
-   */
-  public void markElementStart() throws IOException {
-    if (finished) {
-      throw new IOException("Stream has been finished. Can not add any more elements.");
-    }
-    count++;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    if (finished) {
-      throw new IOException("Stream has been finished. Can not write any more data.");
-    }
-    if (count == 0) {
-      os.write(b);
-      return;
-    }
-
-    if (buffer.hasRemaining()) {
-      buffer.put((byte) b);
-    } else {
-      outputBuffer();
-      os.write(b);
-    }
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    if (finished) {
-      throw new IOException("Stream has been finished. Can not write any more data.");
-    }
-    if (count == 0) {
-      os.write(b, off, len);
-      return;
-    }
-
-    if (buffer.remaining() >= len) {
-      buffer.put(b, off, len);
-    } else {
-      outputBuffer();
-      os.write(b, off, len);
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    if (finished) {
-      return;
-    }
-    outputBuffer();
-    os.flush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    finish();
-    os.close();
-  }
-
-  // Output the buffer if it contains any data.
-  private void outputBuffer() throws IOException {
-    if (count > 0) {
-      VarInt.encode(count, os);
-      // We are using a heap based buffer and not a direct buffer so it is safe to access
-      // the underlying array.
-      os.write(buffer.array(), buffer.arrayOffset(), buffer.position());
-      buffer.clear();
-      // The buffer has been flushed so we must write to the underlying stream until
-      // we learn of the next element. We reset the count to zero marking that we should
-      // not use the buffer.
-      count = 0;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
deleted file mode 100644
index 8b41eb8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/** A utility for manipulating well-known cloud types. */
-enum CloudKnownType {
-  TEXT("http://schema.org/Text", String.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      return clazz.cast(value);
-    }
-  },
-  BOOLEAN("http://schema.org/Boolean", Boolean.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      return clazz.cast(value);
-    }
-  },
-  INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      Object result = null;
-      if (value.getClass() == clazz) {
-        result = value;
-      } else if (clazz == Long.class) {
-        if (value instanceof Integer) {
-          result = ((Integer) value).longValue();
-        } else if (value instanceof String) {
-          result = Long.valueOf((String) value);
-        }
-      } else if (clazz == Integer.class) {
-        if (value instanceof Long) {
-          result = ((Long) value).intValue();
-        } else if (value instanceof String) {
-          result = Integer.valueOf((String) value);
-        }
-      }
-      return clazz.cast(result);
-    }
-  },
-  FLOAT("http://schema.org/Float", Double.class, Float.class) {
-    @Override
-    public <T> T parse(Object value, Class<T> clazz) {
-      Object result = null;
-      if (value.getClass() == clazz) {
-        result = value;
-      } else if (clazz == Double.class) {
-        if (value instanceof Float) {
-          result = ((Float) value).doubleValue();
-        } else if (value instanceof String) {
-          result = Double.valueOf((String) value);
-        }
-      } else if (clazz == Float.class) {
-        if (value instanceof Double) {
-          result = ((Double) value).floatValue();
-        } else if (value instanceof String) {
-          result = Float.valueOf((String) value);
-        }
-      }
-      return clazz.cast(result);
-    }
-  };
-
-  private final String uri;
-  private final Class<?>[] classes;
-
-  private CloudKnownType(String uri, Class<?>... classes) {
-    this.uri = uri;
-    this.classes = classes;
-  }
-
-  public String getUri() {
-    return uri;
-  }
-
-  public abstract <T> T parse(Object value, Class<T> clazz);
-
-  public Class<?> defaultClass() {
-    return classes[0];
-  }
-
-  private static final Map<String, CloudKnownType> typesByUri =
-      Collections.unmodifiableMap(buildTypesByUri());
-
-  private static Map<String, CloudKnownType> buildTypesByUri() {
-    Map<String, CloudKnownType> result = new HashMap<>();
-    for (CloudKnownType ty : CloudKnownType.values()) {
-      result.put(ty.getUri(), ty);
-    }
-    return result;
-  }
-
-  @Nullable
-  public static CloudKnownType forUri(@Nullable String uri) {
-    if (uri == null) {
-      return null;
-    }
-    return typesByUri.get(uri);
-  }
-
-  private static final Map<Class<?>, CloudKnownType> typesByClass =
-  Collections.unmodifiableMap(buildTypesByClass());
-
-  private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
-    Map<Class<?>, CloudKnownType> result = new HashMap<>();
-    for (CloudKnownType ty : CloudKnownType.values()) {
-      for (Class<?> clazz : ty.classes) {
-        result.put(clazz, ty);
-      }
-    }
-    return result;
-  }
-
-  @Nullable
-  public static CloudKnownType forClass(Class<?> clazz) {
-    return typesByClass.get(clazz);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
deleted file mode 100644
index 8c704bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A representation of an arbitrary Java object to be instantiated by Dataflow
- * workers.
- *
- * <p>Typically, an object to be written by the SDK to the Dataflow service will
- * implement a method (typically called {@code asCloudObject()}) that returns a
- * {@code CloudObject} to represent the object in the protocol.  Once the
- * {@code CloudObject} is constructed, the method should explicitly add
- * additional properties to be presented during deserialization, representing
- * child objects by building additional {@code CloudObject}s.
- */
-public final class CloudObject extends GenericJson {
-  /**
-   * Constructs a {@code CloudObject} by copying the supplied serialized object
-   * spec, which must represent an SDK object serialized for transport via the
-   * Dataflow API.
-   *
-   * <p>The most common use of this method is during deserialization on the worker,
-   * where it's used as a binding type during instance construction.
-   *
-   * @param spec supplies the serialized form of the object as a nested map
-   * @throws RuntimeException if the supplied map does not represent an SDK object
-   */
-  public static CloudObject fromSpec(Map<String, Object> spec) {
-    CloudObject result = new CloudObject();
-    result.putAll(spec);
-    if (result.className == null) {
-      throw new RuntimeException("Unable to create an SDK object from " + spec
-          + ": Object class not specified (missing \""
-          + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
-    }
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} to be used for serializing an instance of
-   * the supplied class for transport via the Dataflow API.  The instance
-   * parameters to be serialized must be supplied explicitly after the
-   * {@code CloudObject} is created, by using {@link CloudObject#put}.
-   *
-   * @param cls the class to use when deserializing the object on the worker
-   */
-  public static CloudObject forClass(Class<?> cls) {
-    CloudObject result = new CloudObject();
-    result.className = checkNotNull(cls).getName();
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} to be used for serializing data to be
-   * deserialized using the supplied class name the supplied class name for
-   * transport via the Dataflow API.  The instance parameters to be serialized
-   * must be supplied explicitly after the {@code CloudObject} is created, by
-   * using {@link CloudObject#put}.
-   *
-   * @param className the class to use when deserializing the object on the worker
-   */
-  public static CloudObject forClassName(String className) {
-    CloudObject result = new CloudObject();
-    result.className = checkNotNull(className);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forString(String value) {
-    CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forBoolean(Boolean value) {
-    CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forInteger(Long value) {
-    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forInteger(Integer value) {
-    CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forFloat(Float value) {
-    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value.
-   * @param value the scalar value to represent.
-   */
-  public static CloudObject forFloat(Double value) {
-    CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  /**
-   * Constructs a {@code CloudObject} representing the given value of a
-   * well-known cloud object type.
-   * @param value the scalar value to represent.
-   * @throws RuntimeException if the value does not have a
-   * {@link CloudKnownType} mapping
-   */
-  public static CloudObject forKnownType(Object value) {
-    @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
-    if (ty == null) {
-      throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
-    }
-    CloudObject result = forClassName(ty.getUri());
-    result.put(PropertyNames.SCALAR_FIELD_NAME, value);
-    return result;
-  }
-
-  @Key(PropertyNames.OBJECT_TYPE_NAME)
-  private String className;
-
-  private CloudObject() {}
-
-  /**
-   * Gets the name of the Java class that this CloudObject represents.
-   */
-  public String getClassName() {
-    return className;
-  }
-
-  @Override
-  public CloudObject clone() {
-    return (CloudObject) super.clone();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
deleted file mode 100644
index ddab933..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-
-import com.google.api.client.util.Base64;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoderBase;
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.MapCoderBase;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Throwables;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.ref.SoftReference;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.TypeVariable;
-
-/**
- * Utilities for working with Coders.
- */
-public final class CoderUtils {
-  private CoderUtils() {}  // Non-instantiable
-
-  /**
-   * Coder class-name alias for a key-value type.
-   */
-  public static final String KIND_PAIR = "kind:pair";
-
-  /**
-   * Coder class-name alias for a stream type.
-   */
-  public static final String KIND_STREAM = "kind:stream";
-
-  private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>> threadLocalOutputStream
-      = new ThreadLocal<>();
-
-  /**
-   * If true, a call to {@code encodeToByteArray} is already on the call stack.
-   */
-  private static ThreadLocal<Boolean> threadLocalOutputStreamInUse = new ThreadLocal<Boolean>() {
-    @Override
-    protected Boolean initialValue() {
-      return false;
-    }
-  };
-
-  /**
-   * Encodes the given value using the specified Coder, and returns
-   * the encoded bytes.
-   *
-   * <p>This function is not reentrant; it should not be called from methods of the provided
-   * {@link Coder}.
-   */
-  public static <T> byte[] encodeToByteArray(Coder<T> coder, T value) throws CoderException {
-    return encodeToByteArray(coder, value, Coder.Context.OUTER);
-  }
-
-  public static <T> byte[] encodeToByteArray(Coder<T> coder, T value, Coder.Context context)
-      throws CoderException {
-    if (threadLocalOutputStreamInUse.get()) {
-      // encodeToByteArray() is called recursively and the thread local stream is in use,
-      // allocating a new one.
-      ByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
-      encodeToSafeStream(coder, value, stream, context);
-      return stream.toByteArray();
-    } else {
-      threadLocalOutputStreamInUse.set(true);
-      try {
-        ByteArrayOutputStream stream = getThreadLocalOutputStream();
-        encodeToSafeStream(coder, value, stream, context);
-        return stream.toByteArray();
-      } finally {
-        threadLocalOutputStreamInUse.set(false);
-      }
-    }
-  }
-
-  /**
-   * Encodes {@code value} to the given {@code stream}, which should be a stream that never throws
-   * {@code IOException}, such as {@code ByteArrayOutputStream} or
-   * {@link ExposedByteArrayOutputStream}.
-   */
-  private static <T> void encodeToSafeStream(
-      Coder<T> coder, T value, OutputStream stream, Coder.Context context) throws CoderException {
-    try {
-      coder.encode(value, new UnownedOutputStream(stream), context);
-    } catch (IOException exn) {
-      Throwables.propagateIfPossible(exn, CoderException.class);
-      throw new IllegalArgumentException(
-          "Forbidden IOException when writing to OutputStream", exn);
-    }
-  }
-
-  /**
-   * Decodes the given bytes using the specified Coder, and returns
-   * the resulting decoded value.
-   */
-  public static <T> T decodeFromByteArray(Coder<T> coder, byte[] encodedValue)
-      throws CoderException {
-    return decodeFromByteArray(coder, encodedValue, Coder.Context.OUTER);
-  }
-
-  public static <T> T decodeFromByteArray(
-      Coder<T> coder, byte[] encodedValue, Coder.Context context) throws CoderException {
-    try (ExposedByteArrayInputStream stream = new ExposedByteArrayInputStream(encodedValue)) {
-      T result = decodeFromSafeStream(coder, stream, context);
-      if (stream.available() != 0) {
-        throw new CoderException(
-            stream.available() + " unexpected extra bytes after decoding " + result);
-      }
-      return result;
-    }
-  }
-
-  /**
-   * Decodes a value from the given {@code stream}, which should be a stream that never throws
-   * {@code IOException}, such as {@code ByteArrayInputStream} or
-   * {@link ExposedByteArrayInputStream}.
-   */
-  private static <T> T decodeFromSafeStream(
-      Coder<T> coder, InputStream stream, Coder.Context context) throws CoderException {
-    try {
-      return coder.decode(new UnownedInputStream(stream), context);
-    } catch (IOException exn) {
-      Throwables.propagateIfPossible(exn, CoderException.class);
-      throw new IllegalArgumentException(
-          "Forbidden IOException when reading from InputStream", exn);
-    }
-  }
-
-  private static ByteArrayOutputStream getThreadLocalOutputStream() {
-    SoftReference<ExposedByteArrayOutputStream> refStream = threadLocalOutputStream.get();
-    ExposedByteArrayOutputStream stream = refStream == null ? null : refStream.get();
-    if (stream == null) {
-      stream = new ExposedByteArrayOutputStream();
-      threadLocalOutputStream.set(new SoftReference<>(stream));
-    }
-    stream.reset();
-    return stream;
-  }
-
-  /**
-   * Clones the given value by encoding and then decoding it with the specified Coder.
-   *
-   * <p>This function is not reentrant; it should not be called from methods of the provided
-   * {@link Coder}.
-   */
-  public static <T> T clone(Coder<T> coder, T value) throws CoderException {
-    return decodeFromByteArray(coder, encodeToByteArray(coder, value, Coder.Context.OUTER));
-  }
-
-  /**
-   * Encodes the given value using the specified Coder, and returns the Base64 encoding of the
-   * encoded bytes.
-   *
-   * @throws CoderException if there are errors during encoding.
-   */
-  public static <T> String encodeToBase64(Coder<T> coder, T value)
-      throws CoderException {
-    byte[] rawValue = encodeToByteArray(coder, value);
-    return Base64.encodeBase64URLSafeString(rawValue);
-  }
-
-  /**
-   * Parses a value from a base64-encoded String using the given coder.
-   */
-  public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException {
-    return decodeFromSafeStream(
-        coder, new ByteArrayInputStream(Base64.decodeBase64(encodedValue)), Coder.Context.OUTER);
-  }
-
-  /**
-   * If {@code coderType} is a subclass of {@code Coder<T>} for a specific
-   * type {@code T}, returns {@code T.class}.
-   */
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public static TypeDescriptor getCodedType(TypeDescriptor coderDescriptor) {
-    ParameterizedType coderType =
-        (ParameterizedType) coderDescriptor.getSupertype(Coder.class).getType();
-    TypeDescriptor codedType = TypeDescriptor.of(coderType.getActualTypeArguments()[0]);
-    return codedType;
-  }
-
-  public static CloudObject makeCloudEncoding(
-      String type,
-      CloudObject... componentSpecs) {
-    CloudObject encoding = CloudObject.forClassName(type);
-    if (componentSpecs.length > 0) {
-      addList(encoding, PropertyNames.COMPONENT_ENCODINGS, componentSpecs);
-    }
-    return encoding;
-  }
-
-  /**
-   * A {@link com.fasterxml.jackson.databind.Module} that adds the type
-   * resolver needed for Coder definitions created by the Dataflow service.
-   */
-  static final class Jackson2Module extends SimpleModule {
-    /**
-     * The Coder custom type resolver.
-     *
-     * <p>This resolver resolves coders.  If the Coder ID is a particular
-     * well-known identifier supplied by the Dataflow service, it's replaced
-     * with the corresponding class.  All other Coder instances are resolved
-     * by class name, using the package com.google.cloud.dataflow.sdk.coders
-     * if there are no "."s in the ID.
-     */
-    private static final class Resolver extends TypeIdResolverBase {
-      @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
-      public Resolver() {
-        super(TypeFactory.defaultInstance().constructType(Coder.class),
-            TypeFactory.defaultInstance());
-      }
-
-      @Deprecated
-      @Override
-      public JavaType typeFromId(String id) {
-        return typeFromId(null, id);
-      }
-
-      @Override
-      public JavaType typeFromId(DatabindContext context, String id) {
-        Class<?> clazz = getClassForId(id);
-        if (clazz == KvCoder.class) {
-          clazz = KvCoderBase.class;
-        }
-        if (clazz == MapCoder.class) {
-          clazz = MapCoderBase.class;
-        }
-        @SuppressWarnings("rawtypes")
-        TypeVariable[] tvs = clazz.getTypeParameters();
-        JavaType[] types = new JavaType[tvs.length];
-        for (int lupe = 0; lupe < tvs.length; lupe++) {
-          types[lupe] = TypeFactory.unknownType();
-        }
-        return _typeFactory.constructSimpleType(clazz, types);
-      }
-
-      private Class<?> getClassForId(String id) {
-        try {
-          if (id.contains(".")) {
-            return Class.forName(id);
-          }
-
-          if (id.equals(KIND_STREAM)) {
-            return IterableCoder.class;
-          } else if (id.equals(KIND_PAIR)) {
-            return KvCoder.class;
-          }
-
-          // Otherwise, see if the ID is the name of a class in
-          // com.google.cloud.dataflow.sdk.coders.  We do this via creating
-          // the class object so that class loaders have a chance to get
-          // involved -- and since we need the class object anyway.
-          return Class.forName(Coder.class.getPackage().getName() + "." + id);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
-        }
-      }
-
-      @Override
-      public String idFromValueAndType(Object o, Class<?> clazz) {
-        return clazz.getName();
-      }
-
-      @Override
-      public String idFromValue(Object o) {
-        return o.getClass().getName();
-      }
-
-      @Override
-      public JsonTypeInfo.Id getMechanism() {
-        return JsonTypeInfo.Id.CUSTOM;
-      }
-    }
-
-    /**
-     * The mixin class defining how Coders are handled by the deserialization
-     * {@link ObjectMapper}.
-     *
-     * <p>This is done via a mixin so that this resolver is <i>only</i> used
-     * during deserialization requested by the Dataflow SDK.
-     */
-    @JsonTypeIdResolver(Resolver.class)
-    @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
-    private static final class Mixin {}
-
-    public Jackson2Module() {
-      super("DataflowCoders");
-      setMixInAnnotation(Coder.class, Mixin.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
deleted file mode 100644
index 6f2b89b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * Factory that produces {@code Combine.Context} based on different inputs.
- */
-public class CombineContextFactory {
-
-  private static final Context NULL_CONTEXT = new Context() {
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new IllegalArgumentException("cannot call sideInput() in a null context");
-    }
-  };
-
-  /**
-   * Returns a fake {@code Combine.Context} for tests.
-   */
-  public static Context nullContext() {
-    return NULL_CONTEXT;
-  }
-
-  /**
-   * Returns a {@code Combine.Context} that wraps a {@code DoFn.ProcessContext}.
-   */
-  public static Context createFromProcessContext(final DoFn<?, ?>.ProcessContext c) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
-   * Returns a {@code Combine.Context} that wraps a {@link StateContext}.
-   */
-  public static Context createFromStateContext(final StateContext<?> c) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
-   * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader},
-   * and the main input window.
-   */
-  public static Context createFromComponents(final PipelineOptions options,
-      final SideInputReader sideInputReader, final BoundedWindow mainInputWindow) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return options;
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        if (!sideInputReader.contains(view)) {
-          throw new IllegalArgumentException("calling sideInput() with unknown view");
-        }
-
-        BoundedWindow sideInputWindow =
-            view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-        return sideInputReader.get(view, sideInputWindow);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
deleted file mode 100644
index d974480..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
+++ /dev/null
@@ -1,154 +0,0 @@
-
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-
-import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.ObjectOutputStream;
-
-/**
- * Static utility methods that create combine function instances.
- */
-public class CombineFnUtil {
-  /**
-   * Returns the partial application of the {@link KeyedCombineFnWithContext} to a specific
-   * context to produce a {@link KeyedCombineFn}.
-   *
-   * <p>The returned {@link KeyedCombineFn} cannot be serialized.
-   */
-  public static <K, InputT, AccumT, OutputT> KeyedCombineFn<K, InputT, AccumT, OutputT>
-  bindContext(
-      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
-      StateContext<?> stateContext) {
-    Context context = CombineContextFactory.createFromStateContext(stateContext);
-    return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context);
-  }
-
-  /**
-   * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}.
-   */
-  public static <InputT, AccumT, OutputT>
-      CombineFnWithContext<InputT, AccumT, OutputT> toFnWithContext(
-          GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
-    if (globalCombineFn instanceof CombineFnWithContext) {
-      @SuppressWarnings("unchecked")
-      CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext =
-          (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
-      return combineFnWithContext;
-    } else {
-      @SuppressWarnings("unchecked")
-      final CombineFn<InputT, AccumT, OutputT> combineFn =
-          (CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
-      return new CombineFnWithContext<InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(Context c) {
-          return combineFn.createAccumulator();
-        }
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input, Context c) {
-          return combineFn.addInput(accumulator, input);
-        }
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
-          return combineFn.mergeAccumulators(accumulators);
-        }
-        @Override
-        public OutputT extractOutput(AccumT accumulator, Context c) {
-          return combineFn.extractOutput(accumulator);
-        }
-        @Override
-        public AccumT compact(AccumT accumulator, Context c) {
-          return combineFn.compact(accumulator);
-        }
-        @Override
-        public OutputT defaultValue() {
-          return combineFn.defaultValue();
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return combineFn.getAccumulatorCoder(registry, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return combineFn.getDefaultOutputCoder(registry, inputCoder);
-        }
-      };
-    }
-  }
-
-  private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
-      extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-    private final Context context;
-
-    private NonSerializableBoundedKeyedCombineFn(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
-        Context context) {
-      this.combineFn = combineFn;
-      this.context = context;
-    }
-    @Override
-    public AccumT createAccumulator(K key) {
-      return combineFn.createAccumulator(key, context);
-    }
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT value) {
-      return combineFn.addInput(key, accumulator, value, context);
-    }
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators, context);
-    }
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator) {
-      return combineFn.extractOutput(key, accumulator, context);
-    }
-    @Override
-    public AccumT compact(K key, AccumT accumulator) {
-      return combineFn.compact(key, accumulator, context);
-    }
-    @Override
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return combineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
-    }
-    @Override
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
-    }
-
-    private void writeObject(@SuppressWarnings("unused") ObjectOutputStream out)
-        throws IOException {
-      throw new NotSerializableException(
-          "Cannot serialize the CombineFn resulting from CombineFnUtil.bindContext.");
-    }
-  }
-}