You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:38 UTC
[14/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
deleted file mode 100644
index b6a1493..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * TimerInternals that uses priority queues to manage the timers that are ready to fire.
- */
-public class BatchTimerInternals implements TimerInternals {
- /** Set of timers that are scheduled used for deduplicating timers. */
- private Set<TimerData> existingTimers = new HashSet<>();
-
- // Keep these queues separate so we can advance over them separately.
- private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
- private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
- private Instant inputWatermarkTime;
- private Instant processingTime;
-
- private PriorityQueue<TimerData> queue(TimeDomain domain) {
- return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
- }
-
- public BatchTimerInternals(Instant processingTime) {
- this.processingTime = processingTime;
- this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- @Override
- public void setTimer(TimerData timer) {
- if (existingTimers.add(timer)) {
- queue(timer.getDomain()).add(timer);
- }
- }
-
- @Override
- public void deleteTimer(TimerData timer) {
- existingTimers.remove(timer);
- queue(timer.getDomain()).remove(timer);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return processingTime;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
- * is already complete.
- */
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return inputWatermarkTime;
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- // The output watermark is always undefined in batch mode.
- return null;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("watermarkTimers", watermarkTimers)
- .add("processingTimers", processingTimers)
- .toString();
- }
-
- public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
- throws Exception {
- Preconditions.checkState(!newInputWatermark.isBefore(inputWatermarkTime),
- "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
- newInputWatermark);
- inputWatermarkTime = newInputWatermark;
- advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
- }
-
- public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
- throws Exception {
- Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
- "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
- processingTime = newProcessingTime;
- advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
- }
-
- private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
- throws Exception {
- PriorityQueue<TimerData> timers = queue(domain);
- boolean shouldFire = false;
-
- do {
- TimerData timer = timers.peek();
- // Timers fire if the new time is ahead of the timer
- shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
- if (shouldFire) {
- // Remove before firing, so that if the trigger adds another identical
- // timer we don't remove it.
- timers.remove();
- runner.onTimer(timer);
- }
- } while (shouldFire);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
deleted file mode 100644
index cd51062..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * Inserts rows into BigQuery.
- */
-public class BigQueryTableInserter {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
- // Approximate amount of table data to upload per InsertAll request.
- private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
- // The maximum number of rows to upload per InsertAll request.
- private static final long MAX_ROWS_PER_BATCH = 500;
-
- // The maximum number of times to retry inserting rows into BigQuery.
- private static final int MAX_INSERT_ATTEMPTS = 5;
-
- // The initial backoff after a failure inserting rows into BigQuery.
- private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
- private final Bigquery client;
- private final TableReference defaultRef;
- private final long maxRowsPerBatch;
-
- private static final ExecutorService executor = MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS);
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- */
- public BigQueryTableInserter(Bigquery client) {
- this.client = client;
- this.defaultRef = null;
- this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param defaultRef identifies the table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)}
- */
- @Deprecated
- public BigQueryTableInserter(Bigquery client, TableReference defaultRef) {
- this.client = client;
- this.defaultRef = defaultRef;
- this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- */
- public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) {
- this.client = client;
- this.defaultRef = null;
- this.maxRowsPerBatch = maxRowsPerBatch;
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param defaultRef identifies the default table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)}
- */
- @Deprecated
- public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) {
- this.client = client;
- this.defaultRef = defaultRef;
- this.maxRowsPerBatch = maxRowsPerBatch;
- }
-
- /**
- * Insert all rows from the given list.
- *
- * @deprecated replaced by {@link #insertAll(TableReference, List)}
- */
- @Deprecated
- public void insertAll(List<TableRow> rowList) throws IOException {
- insertAll(defaultRef, rowList, null, null);
- }
-
- /**
- * Insert all rows from the given list using specified insertIds if not null.
- *
- * @deprecated replaced by {@link #insertAll(TableReference, List, List)}
- */
- @Deprecated
- public void insertAll(List<TableRow> rowList,
- @Nullable List<String> insertIdList) throws IOException {
- insertAll(defaultRef, rowList, insertIdList, null);
- }
-
- /**
- * Insert all rows from the given list.
- */
- public void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
- insertAll(ref, rowList, null, null);
- }
-
- /**
- * Insert all rows from the given list using specified insertIds if not null. Track count of
- * bytes written with the Aggregator.
- */
- public void insertAll(TableReference ref, List<TableRow> rowList,
- @Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator)
- throws IOException {
- Preconditions.checkNotNull(ref, "ref");
- if (insertIdList != null && rowList.size() != insertIdList.size()) {
- throw new AssertionError("If insertIdList is not null it needs to have at least "
- + "as many elements as rowList");
- }
-
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_INSERT_ATTEMPTS,
- INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
- List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
- // These lists contain the rows to publish. Initially the contain the entire list. If there are
- // failures, they will contain only the failed rows to be retried.
- List<TableRow> rowsToPublish = rowList;
- List<String> idsToPublish = insertIdList;
- while (true) {
- List<TableRow> retryRows = new ArrayList<>();
- List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
-
- int strideIndex = 0;
- // Upload in batches.
- List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
- int dataSize = 0;
-
- List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
- List<Integer> strideIndices = new ArrayList<>();
-
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i);
- TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
- dataSize += row.toString().length();
- if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch ||
- i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new TableDataInsertAllRequest();
- content.setRows(rows);
-
- final Bigquery.Tabledata.InsertAll insert = client.tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
- content);
-
- futures.add(
- executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
- @Override
- public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
- return insert.execute().getInsertErrors();
- }
- }));
- strideIndices.add(strideIndex);
-
- if (byteCountAggregator != null) {
- byteCountAggregator.addValue(Long.valueOf(dataSize));
- }
- dataSize = 0;
- strideIndex = i + 1;
- rows = new LinkedList<>();
- }
- }
-
- try {
- for (int i = 0; i < futures.size(); i++) {
- List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
- if (errors != null) {
- for (TableDataInsertAllResponse.InsertErrors error : errors) {
- allErrors.add(error);
- if (error.getIndex() == null) {
- throw new IOException("Insert failed: " + allErrors);
- }
-
- int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
- retryRows.add(rowsToPublish.get(errorIndex));
- if (retryIds != null) {
- retryIds.add(idsToPublish.get(errorIndex));
- }
- }
- }
- }
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while inserting " + rowsToPublish);
- } catch (ExecutionException e) {
- Throwables.propagate(e.getCause());
- }
-
- if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
- }
- LOG.info("Retrying failed inserts to BigQuery");
- rowsToPublish = retryRows;
- idsToPublish = retryIds;
- allErrors.clear();
- } else {
- break;
- }
- }
- if (!allErrors.isEmpty()) {
- throw new IOException("Insert failed: " + allErrors);
- }
- }
-
- /**
- * Retrieves or creates the table.
- *
- * <p>The table is checked to conform to insertion requirements as specified
- * by WriteDisposition and CreateDisposition.
- *
- * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
- * this will re-create the table if necessary to ensure it is empty.
- *
- * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
- * will fail if the table exists and is not empty.
- *
- * <p>When constructing a table, a {@code TableSchema} must be available. If a
- * schema is provided, then it will be used. If no schema is provided, but
- * an existing table is being cleared (WRITE_TRUNCATE option above), then
- * the existing schema will be re-used. If no schema is available, then an
- * {@code IOException} is thrown.
- */
- public Table getOrCreateTable(
- TableReference ref,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable TableSchema schema) throws IOException {
- // Check if table already exists.
- Bigquery.Tables.Get get = client.tables()
- .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- Table table = null;
- try {
- table = get.execute();
- } catch (IOException e) {
- ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
- if (!errorExtractor.itemNotFound(e) ||
- createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
- // Rethrow.
- throw e;
- }
- }
-
- // If we want an empty table, and it isn't, then delete it first.
- if (table != null) {
- if (writeDisposition == WriteDisposition.WRITE_APPEND) {
- return table;
- }
-
- boolean empty = isEmpty(ref);
- if (empty) {
- if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
- LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
- }
- return table;
-
- } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
- throw new IOException("WriteDisposition is WRITE_EMPTY, "
- + "but table is not empty");
- }
-
- // Reuse the existing schema if none was provided.
- if (schema == null) {
- schema = table.getSchema();
- }
-
- // Delete table and fall through to re-creating it below.
- LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
- Bigquery.Tables.Delete delete = client.tables()
- .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- delete.execute();
- }
-
- if (schema == null) {
- throw new IllegalArgumentException(
- "Table schema required for new table.");
- }
-
- // Create the table.
- return tryCreateTable(ref, schema);
- }
-
- /**
- * Checks if a table is empty.
- */
- public boolean isEmpty(TableReference ref) throws IOException {
- Bigquery.Tabledata.List list = client.tabledata()
- .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- list.setMaxResults(1L);
- TableDataList dataList = list.execute();
-
- return dataList.getRows() == null || dataList.getRows().isEmpty();
- }
-
- /**
- * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
- * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
- * configured with a table spec function to use different tables for each window.
- */
- private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
- /**
- * Tries to create the BigQuery table.
- * If a table with the same name already exists in the dataset, the table
- * creation fails, and the function returns null. In such a case,
- * the existing table doesn't necessarily have the same schema as specified
- * by the parameter.
- *
- * @param schema Schema of the new BigQuery table.
- * @return The newly created BigQuery table information, or null if the table
- * with the same name already exists.
- * @throws IOException if other error than already existing table occurs.
- */
- @Nullable
- public Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
- LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
- BackOff backoff =
- new ExponentialBackOff.Builder()
- .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
- .build();
-
- Table table = new Table().setTableReference(ref).setSchema(schema);
- return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
- }
-
- @VisibleForTesting
- @Nullable
- Table tryCreateTable(
- Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
- throws IOException {
- boolean retry = false;
- while (true) {
- try {
- return client.tables().insert(projectId, datasetId, table).execute();
- } catch (IOException e) {
- ApiErrorExtractor extractor = new ApiErrorExtractor();
- if (extractor.itemAlreadyExists(e)) {
- // The table already exists, nothing to return.
- return null;
- } else if (extractor.rateLimited(e)) {
- // The request failed because we hit a temporary quota. Back off and try again.
- try {
- if (BackOffUtils.next(sleeper, backoff)) {
- if (!retry) {
- LOG.info(
- "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
- projectId,
- datasetId,
- table.getTableReference().getTableId(),
- TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
- retry = true;
- }
- continue;
- }
- } catch (InterruptedException e1) {
- // Restore interrupted state and throw the last failure.
- Thread.currentThread().interrupt();
- throw e;
- }
- }
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
deleted file mode 100644
index c2c80f7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ClassInfo;
-import com.google.api.client.util.Data;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Random;
-
-import javax.annotation.Nullable;
-
-/**
- * Iterates over all rows in a table.
- */
-public class BigQueryTableRowIterator implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
-
- @Nullable private TableReference ref;
- @Nullable private final String projectId;
- @Nullable private TableSchema schema;
- private final Bigquery client;
- private String pageToken;
- private Iterator<TableRow> iteratorOverCurrentBatch;
- private TableRow current;
- // Set true when the final page is seen from the service.
- private boolean lastPage = false;
-
- // The maximum number of times a BigQuery request will be retried
- private static final int MAX_RETRIES = 3;
- // Initial wait time for the backoff implementation
- private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
-
- // After sending a query to BQ service we will be polling the BQ service to check the status with
- // following interval to check the status of query execution job
- private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
-
- private final String query;
- // Whether to flatten query results.
- private final boolean flattenResults;
- // Temporary dataset used to store query results.
- private String temporaryDatasetId = null;
- // Temporary table used to store query results.
- private String temporaryTableId = null;
-
- private BigQueryTableRowIterator(
- @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
- Bigquery client, boolean flattenResults) {
- this.ref = ref;
- this.query = query;
- this.projectId = projectId;
- this.client = checkNotNull(client, "client");
- this.flattenResults = flattenResults;
- }
-
- /**
- * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
- */
- public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
- checkNotNull(ref, "ref");
- checkNotNull(client, "client");
- return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
- }
-
- /**
- * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
- * specified query in the specified project.
- */
- public static BigQueryTableRowIterator fromQuery(
- String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
- checkNotNull(query, "query");
- checkNotNull(projectId, "projectId");
- checkNotNull(client, "client");
- return new BigQueryTableRowIterator(null, query, projectId, client,
- MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
- }
-
- /**
- * Opens the table for read.
- * @throws IOException on failure
- */
- public void open() throws IOException, InterruptedException {
- if (query != null) {
- ref = executeQueryAndWaitForCompletion();
- }
- // Get table schema.
- Bigquery.Tables.Get get =
- client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-
- Table table =
- executeWithBackOff(
- get,
- "Error opening BigQuery table %s of dataset %s : {}",
- ref.getTableId(),
- ref.getDatasetId());
- schema = table.getSchema();
- }
-
- public boolean advance() throws IOException, InterruptedException {
- while (true) {
- if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
- // Embed schema information into the raw row, so that values have an
- // associated key. This matches how rows are read when using the
- // DataflowPipelineRunner.
- current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
- return true;
- }
- if (lastPage) {
- return false;
- }
-
- Bigquery.Tabledata.List list =
- client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- if (pageToken != null) {
- list.setPageToken(pageToken);
- }
-
- TableDataList result =
- executeWithBackOff(
- list,
- "Error reading from BigQuery table %s of dataset %s : {}",
- ref.getTableId(),
- ref.getDatasetId());
-
- pageToken = result.getPageToken();
- iteratorOverCurrentBatch =
- result.getRows() != null
- ? result.getRows().iterator()
- : Collections.<TableRow>emptyIterator();
-
- // The server may return a page token indefinitely on a zero-length table.
- if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
- lastPage = true;
- }
- }
- }
-
- public TableRow getCurrent() {
- if (current == null) {
- throw new NoSuchElementException();
- }
- return current;
- }
-
- /**
- * Adjusts a field returned from the BigQuery API to match what we will receive when running
- * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
- * used for batch jobs executed on the Cloud Dataflow service.
- *
- * <p>The following is the relationship between BigQuery schema and Java types:
- *
- * <ul>
- * <li>Nulls are {@code null}.
- * <li>Repeated fields are {@code List} of objects.
- * <li>Record columns are {@link TableRow} objects.
- * <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
- * <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
- * <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
- * {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
- * zeros and can be 1 to 6 digits long.
- * <li>Every other atomic type is a {@code String}.
- * </ul>
- *
- * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
- *
- * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
- * and are not accessible through the {@link TableRow#getF} function.
- */
- @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
- if (Data.isNull(v)) {
- return null;
- }
-
- if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
- TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
- ImmutableList.Builder<Object> values = ImmutableList.builder();
- for (Map<String, Object> element : rawCells) {
- values.add(getTypedCellValue(elementSchema, element.get("v")));
- }
- return values.build();
- }
-
- if (fieldSchema.getType().equals("RECORD")) {
- @SuppressWarnings("unchecked")
- Map<String, Object> typedV = (Map<String, Object>) v;
- return getTypedTableRow(fieldSchema.getFields(), typedV);
- }
-
- if (fieldSchema.getType().equals("FLOAT")) {
- return Double.parseDouble((String) v);
- }
-
- if (fieldSchema.getType().equals("BOOLEAN")) {
- return Boolean.parseBoolean((String) v);
- }
-
- if (fieldSchema.getType().equals("TIMESTAMP")) {
- return AvroUtils.formatTimestamp((String) v);
- }
-
- return v;
- }
-
- /**
- * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
- * because they are reserved keywords in {@link TableRow}.
- */
- // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
- // not indirect through our broken use of {@link TableRow}.
- // See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
- private static final Collection<String> RESERVED_FIELD_NAMES =
- ClassInfo.of(TableRow.class).getNames();
-
- /**
- * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
- * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
- * the cells are converted to Java types according to the provided field schemas.
- *
- * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
- * types are mapped to Java types.
- */
- private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
- // If rawRow is a TableRow, use it. If not, create a new one.
- TableRow row;
- List<? extends Map<String, Object>> cells;
- if (rawRow instanceof TableRow) {
- // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
- // any type conversion, but extract the cells for cell-wise processing below.
- row = (TableRow) rawRow;
- cells = row.getF();
- // Clear the cells from the row, so that row.getF() will return null. This matches the
- // behavior of rows produced by the BigQuery export API used on the service.
- row.setF(null);
- } else {
- row = new TableRow();
-
- // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
- // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
- // we will use Map.get("v") instead of TableCell.getV() get its value.
- @SuppressWarnings("unchecked")
- List<? extends Map<String, Object>> rawCells =
- (List<? extends Map<String, Object>>) rawRow.get("f");
- cells = rawCells;
- }
-
- checkState(cells.size() == fields.size(),
- "Expected that the row has the same number of cells %s as fields in the schema %s",
- cells.size(), fields.size());
-
- // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
- // and storing the normalized values by field name in the Map<String, Object> that
- // underlies the TableRow.
- Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
- Iterator<TableFieldSchema> fieldIt = fields.iterator();
- while (cellIt.hasNext()) {
- Map<String, Object> cell = cellIt.next();
- TableFieldSchema fieldSchema = fieldIt.next();
-
- // Convert the object in this cell to the Java type corresponding to its type in the schema.
- Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
-
- String fieldName = fieldSchema.getName();
- checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
- "BigQueryIO does not support records with columns named %s", fieldName);
-
- if (convertedValue == null) {
- // BigQuery does not include null values when the export operation (to JSON) is used.
- // To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner,
- // intentionally omits columns with null values.
- continue;
- }
-
- row.set(fieldName, convertedValue);
- }
- return row;
- }
-
- // Create a new BigQuery dataset
- private void createDataset(String datasetId) throws IOException, InterruptedException {
- Dataset dataset = new Dataset();
- DatasetReference reference = new DatasetReference();
- reference.setProjectId(projectId);
- reference.setDatasetId(datasetId);
- dataset.setDatasetReference(reference);
-
- String createDatasetError =
- "Error when trying to create the temporary dataset " + datasetId + " in project "
- + projectId;
- executeWithBackOff(
- client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
- }
-
- // Delete the given table that is available in the given dataset.
- private void deleteTable(String datasetId, String tableId)
- throws IOException, InterruptedException {
- executeWithBackOff(
- client.tables().delete(projectId, datasetId, tableId),
- "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
- + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
- }
-
- // Delete the given dataset. This will fail if the given dataset has any tables.
- private void deleteDataset(String datasetId) throws IOException, InterruptedException {
- executeWithBackOff(
- client.datasets().delete(projectId, datasetId),
- "Error when trying to delete the temporary dataset " + datasetId + " in project "
- + projectId + ". Manual deletion may be required. Error message : {}");
- }
-
- /**
- * Executes the specified query and returns a reference to the temporary BigQuery table created
- * to hold the results.
- *
- * @throws IOException if the query fails.
- */
- private TableReference executeQueryAndWaitForCompletion()
- throws IOException, InterruptedException {
- // Create a temporary dataset to store results.
- // Starting dataset name with an "_" so that it is hidden.
- Random rnd = new Random(System.currentTimeMillis());
- temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
- temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
-
- createDataset(temporaryDatasetId);
- Job job = new Job();
- JobConfiguration config = new JobConfiguration();
- JobConfigurationQuery queryConfig = new JobConfigurationQuery();
- config.setQuery(queryConfig);
- job.setConfiguration(config);
- queryConfig.setQuery(query);
- queryConfig.setAllowLargeResults(true);
- queryConfig.setFlattenResults(flattenResults);
-
- TableReference destinationTable = new TableReference();
- destinationTable.setProjectId(projectId);
- destinationTable.setDatasetId(temporaryDatasetId);
- destinationTable.setTableId(temporaryTableId);
- queryConfig.setDestinationTable(destinationTable);
-
- Insert insert = client.jobs().insert(projectId, job);
- Job queryJob = executeWithBackOff(
- insert, "Error when trying to execute the job for query " + query + " :{}");
- JobReference jobId = queryJob.getJobReference();
-
- while (true) {
- Job pollJob = executeWithBackOff(
- client.jobs().get(projectId, jobId.getJobId()),
- "Error when trying to get status of the job for query " + query + " :{}");
- JobStatus status = pollJob.getStatus();
- if (status.getState().equals("DONE")) {
- // Job is DONE, but did not necessarily succeed.
- ErrorProto error = status.getErrorResult();
- if (error == null) {
- return pollJob.getConfiguration().getQuery().getDestinationTable();
- } else {
- // There will be no temporary table to delete, so null out the reference.
- temporaryTableId = null;
- throw new IOException("Executing query " + query + " failed: " + error.getMessage());
- }
- }
- try {
- Thread.sleep(QUERY_COMPLETION_POLL_TIME.getMillis());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- // Execute a BQ request with exponential backoff and return the result.
- // client - BQ request to be executed
- // error - Formatted message to log if when a request fails. Takes exception message as a
- // formatter parameter.
- public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
- Object... errorArgs) throws IOException, InterruptedException {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff =
- new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
-
- T result = null;
- while (true) {
- try {
- result = client.execute();
- break;
- } catch (IOException e) {
- LOG.error(String.format(error, errorArgs), e.getMessage());
- if (!BackOffUtils.next(sleeper, backOff)) {
- LOG.error(
- String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
- throw e;
- }
- }
- }
-
- return result;
- }
-
- @Override
- public void close() {
- // Prevent any further requests.
- lastPage = true;
-
- try {
- // Deleting temporary table and dataset that gets generated when executing a query.
- if (temporaryDatasetId != null) {
- if (temporaryTableId != null) {
- deleteTable(temporaryDatasetId, temporaryTableId);
- }
- deleteDataset(temporaryDatasetId);
- }
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
deleted file mode 100644
index f3a039a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BitSetCoder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.BitSet;
-
-/**
- * Coder for the BitSet used to track child-trigger finished states.
- */
-class BitSetCoder extends AtomicCoder<BitSet> {
-
- private static final BitSetCoder INSTANCE = new BitSetCoder();
- private transient ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
-
- private BitSetCoder() {}
-
- public static BitSetCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(BitSet value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- byteArrayCoder.encodeAndOwn(value.toByteArray(), outStream, context);
- }
-
- @Override
- public BitSet decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return BitSet.valueOf(byteArrayCoder.decode(inStream, context));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "BitSetCoder requires its byteArrayCoder to be deterministic.",
- byteArrayCoder);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
deleted file mode 100644
index e8e693a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BufferedElementCountingOutputStream.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * Provides an efficient encoding for {@link Iterable}s containing small values by
- * buffering up to {@code bufferSize} bytes of data before prefixing the count.
- * Note that each element needs to be encoded in a nested context. See
- * {@link Context Coder.Context} for more details.
- *
- * <p>To use this stream:
- * <pre><code>
- * BufferedElementCountingOutputStream os = ...
- * for (Element E : elements) {
- * os.markElementStart();
- * // write an element to os
- * }
- * os.finish();
- * </code></pre>
- *
- * <p>The resulting output stream is:
- * <pre>
- * countA element(0) element(1) ... element(countA - 1)
- * countB element(0) element(1) ... element(countB - 1)
- * ...
- * countX element(0) element(1) ... element(countX - 1)
- * countY
- * </pre>
- *
- * <p>To read this stream:
- * <pre><code>
- * InputStream is = ...
- * long count;
- * do {
- * count = VarInt.decodeLong(is);
- * for (int i = 0; i < count; ++i) {
- * // read an element from is
- * }
- * } while(count > 0);
- * </code></pre>
- *
- * <p>The counts are encoded as variable length longs. See {@link VarInt#encode(long, OutputStream)}
- * for more details. The end of the iterable is detected by reading a count of 0.
- */
-@NotThreadSafe
-public class BufferedElementCountingOutputStream extends OutputStream {
- public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private final ByteBuffer buffer;
- private final OutputStream os;
- private boolean finished;
- private long count;
-
- /**
- * Creates an output stream which encodes the number of elements output to it in a streaming
- * manner.
- */
- public BufferedElementCountingOutputStream(OutputStream os) {
- this(os, DEFAULT_BUFFER_SIZE);
- }
-
- /**
- * Creates an output stream which encodes the number of elements output to it in a streaming
- * manner with the given {@code bufferSize}.
- */
- BufferedElementCountingOutputStream(OutputStream os, int bufferSize) {
- this.buffer = ByteBuffer.allocate(bufferSize);
- this.os = os;
- this.finished = false;
- this.count = 0;
- }
-
- /**
- * Finishes the encoding by flushing any buffered data,
- * and outputting a final count of 0.
- */
- public void finish() throws IOException {
- if (finished) {
- return;
- }
- flush();
- // Finish the stream by stating that there are 0 elements that follow.
- VarInt.encode(0, os);
- finished = true;
- }
-
- /**
- * Marks that a new element is being output. This allows this output stream
- * to use the buffer if it had previously overflowed marking the start of a new
- * block of elements.
- */
- public void markElementStart() throws IOException {
- if (finished) {
- throw new IOException("Stream has been finished. Can not add any more elements.");
- }
- count++;
- }
-
- @Override
- public void write(int b) throws IOException {
- if (finished) {
- throw new IOException("Stream has been finished. Can not write any more data.");
- }
- if (count == 0) {
- os.write(b);
- return;
- }
-
- if (buffer.hasRemaining()) {
- buffer.put((byte) b);
- } else {
- outputBuffer();
- os.write(b);
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- if (finished) {
- throw new IOException("Stream has been finished. Can not write any more data.");
- }
- if (count == 0) {
- os.write(b, off, len);
- return;
- }
-
- if (buffer.remaining() >= len) {
- buffer.put(b, off, len);
- } else {
- outputBuffer();
- os.write(b, off, len);
- }
- }
-
- @Override
- public void flush() throws IOException {
- if (finished) {
- return;
- }
- outputBuffer();
- os.flush();
- }
-
- @Override
- public void close() throws IOException {
- finish();
- os.close();
- }
-
- // Output the buffer if it contains any data.
- private void outputBuffer() throws IOException {
- if (count > 0) {
- VarInt.encode(count, os);
- // We are using a heap based buffer and not a direct buffer so it is safe to access
- // the underlying array.
- os.write(buffer.array(), buffer.arrayOffset(), buffer.position());
- buffer.clear();
- // The buffer has been flushed so we must write to the underlying stream until
- // we learn of the next element. We reset the count to zero marking that we should
- // not use the buffer.
- count = 0;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
deleted file mode 100644
index 8b41eb8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudKnownType.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/** A utility for manipulating well-known cloud types. */
-enum CloudKnownType {
- TEXT("http://schema.org/Text", String.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- return clazz.cast(value);
- }
- },
- BOOLEAN("http://schema.org/Boolean", Boolean.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- return clazz.cast(value);
- }
- },
- INTEGER("http://schema.org/Integer", Long.class, Integer.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- Object result = null;
- if (value.getClass() == clazz) {
- result = value;
- } else if (clazz == Long.class) {
- if (value instanceof Integer) {
- result = ((Integer) value).longValue();
- } else if (value instanceof String) {
- result = Long.valueOf((String) value);
- }
- } else if (clazz == Integer.class) {
- if (value instanceof Long) {
- result = ((Long) value).intValue();
- } else if (value instanceof String) {
- result = Integer.valueOf((String) value);
- }
- }
- return clazz.cast(result);
- }
- },
- FLOAT("http://schema.org/Float", Double.class, Float.class) {
- @Override
- public <T> T parse(Object value, Class<T> clazz) {
- Object result = null;
- if (value.getClass() == clazz) {
- result = value;
- } else if (clazz == Double.class) {
- if (value instanceof Float) {
- result = ((Float) value).doubleValue();
- } else if (value instanceof String) {
- result = Double.valueOf((String) value);
- }
- } else if (clazz == Float.class) {
- if (value instanceof Double) {
- result = ((Double) value).floatValue();
- } else if (value instanceof String) {
- result = Float.valueOf((String) value);
- }
- }
- return clazz.cast(result);
- }
- };
-
- private final String uri;
- private final Class<?>[] classes;
-
- private CloudKnownType(String uri, Class<?>... classes) {
- this.uri = uri;
- this.classes = classes;
- }
-
- public String getUri() {
- return uri;
- }
-
- public abstract <T> T parse(Object value, Class<T> clazz);
-
- public Class<?> defaultClass() {
- return classes[0];
- }
-
- private static final Map<String, CloudKnownType> typesByUri =
- Collections.unmodifiableMap(buildTypesByUri());
-
- private static Map<String, CloudKnownType> buildTypesByUri() {
- Map<String, CloudKnownType> result = new HashMap<>();
- for (CloudKnownType ty : CloudKnownType.values()) {
- result.put(ty.getUri(), ty);
- }
- return result;
- }
-
- @Nullable
- public static CloudKnownType forUri(@Nullable String uri) {
- if (uri == null) {
- return null;
- }
- return typesByUri.get(uri);
- }
-
- private static final Map<Class<?>, CloudKnownType> typesByClass =
- Collections.unmodifiableMap(buildTypesByClass());
-
- private static Map<Class<?>, CloudKnownType> buildTypesByClass() {
- Map<Class<?>, CloudKnownType> result = new HashMap<>();
- for (CloudKnownType ty : CloudKnownType.values()) {
- for (Class<?> clazz : ty.classes) {
- result.put(clazz, ty);
- }
- }
- return result;
- }
-
- @Nullable
- public static CloudKnownType forClass(Class<?> clazz) {
- return typesByClass.get(clazz);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
deleted file mode 100644
index 8c704bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CloudObject.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.util.Key;
-
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A representation of an arbitrary Java object to be instantiated by Dataflow
- * workers.
- *
- * <p>Typically, an object to be written by the SDK to the Dataflow service will
- * implement a method (typically called {@code asCloudObject()}) that returns a
- * {@code CloudObject} to represent the object in the protocol. Once the
- * {@code CloudObject} is constructed, the method should explicitly add
- * additional properties to be presented during deserialization, representing
- * child objects by building additional {@code CloudObject}s.
- */
-public final class CloudObject extends GenericJson {
- /**
- * Constructs a {@code CloudObject} by copying the supplied serialized object
- * spec, which must represent an SDK object serialized for transport via the
- * Dataflow API.
- *
- * <p>The most common use of this method is during deserialization on the worker,
- * where it's used as a binding type during instance construction.
- *
- * @param spec supplies the serialized form of the object as a nested map
- * @throws RuntimeException if the supplied map does not represent an SDK object
- */
- public static CloudObject fromSpec(Map<String, Object> spec) {
- CloudObject result = new CloudObject();
- result.putAll(spec);
- if (result.className == null) {
- throw new RuntimeException("Unable to create an SDK object from " + spec
- + ": Object class not specified (missing \""
- + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
- }
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} to be used for serializing an instance of
- * the supplied class for transport via the Dataflow API. The instance
- * parameters to be serialized must be supplied explicitly after the
- * {@code CloudObject} is created, by using {@link CloudObject#put}.
- *
- * @param cls the class to use when deserializing the object on the worker
- */
- public static CloudObject forClass(Class<?> cls) {
- CloudObject result = new CloudObject();
- result.className = checkNotNull(cls).getName();
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} to be used for serializing data to be
- * deserialized using the supplied class name the supplied class name for
- * transport via the Dataflow API. The instance parameters to be serialized
- * must be supplied explicitly after the {@code CloudObject} is created, by
- * using {@link CloudObject#put}.
- *
- * @param className the class to use when deserializing the object on the worker
- */
- public static CloudObject forClassName(String className) {
- CloudObject result = new CloudObject();
- result.className = checkNotNull(className);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forString(String value) {
- CloudObject result = forClassName(CloudKnownType.TEXT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forBoolean(Boolean value) {
- CloudObject result = forClassName(CloudKnownType.BOOLEAN.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forInteger(Long value) {
- CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forInteger(Integer value) {
- CloudObject result = forClassName(CloudKnownType.INTEGER.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forFloat(Float value) {
- CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value.
- * @param value the scalar value to represent.
- */
- public static CloudObject forFloat(Double value) {
- CloudObject result = forClassName(CloudKnownType.FLOAT.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- /**
- * Constructs a {@code CloudObject} representing the given value of a
- * well-known cloud object type.
- * @param value the scalar value to represent.
- * @throws RuntimeException if the value does not have a
- * {@link CloudKnownType} mapping
- */
- public static CloudObject forKnownType(Object value) {
- @Nullable CloudKnownType ty = CloudKnownType.forClass(value.getClass());
- if (ty == null) {
- throw new RuntimeException("Unable to represent value via the Dataflow API: " + value);
- }
- CloudObject result = forClassName(ty.getUri());
- result.put(PropertyNames.SCALAR_FIELD_NAME, value);
- return result;
- }
-
- @Key(PropertyNames.OBJECT_TYPE_NAME)
- private String className;
-
- private CloudObject() {}
-
- /**
- * Gets the name of the Java class that this CloudObject represents.
- */
- public String getClassName() {
- return className;
- }
-
- @Override
- public CloudObject clone() {
- return (CloudObject) super.clone();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
deleted file mode 100644
index ddab933..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CoderUtils.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-
-import com.google.api.client.util.Base64;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoderBase;
-import com.google.cloud.dataflow.sdk.coders.MapCoder;
-import com.google.cloud.dataflow.sdk.coders.MapCoderBase;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Throwables;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
-import com.fasterxml.jackson.databind.DatabindContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
-import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.type.TypeFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.ref.SoftReference;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.TypeVariable;
-
-/**
- * Utilities for working with Coders.
- */
-public final class CoderUtils {
- private CoderUtils() {} // Non-instantiable
-
- /**
- * Coder class-name alias for a key-value type.
- */
- public static final String KIND_PAIR = "kind:pair";
-
- /**
- * Coder class-name alias for a stream type.
- */
- public static final String KIND_STREAM = "kind:stream";
-
- private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>> threadLocalOutputStream
- = new ThreadLocal<>();
-
- /**
- * If true, a call to {@code encodeToByteArray} is already on the call stack.
- */
- private static ThreadLocal<Boolean> threadLocalOutputStreamInUse = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
-
- /**
- * Encodes the given value using the specified Coder, and returns
- * the encoded bytes.
- *
- * <p>This function is not reentrant; it should not be called from methods of the provided
- * {@link Coder}.
- */
- public static <T> byte[] encodeToByteArray(Coder<T> coder, T value) throws CoderException {
- return encodeToByteArray(coder, value, Coder.Context.OUTER);
- }
-
- public static <T> byte[] encodeToByteArray(Coder<T> coder, T value, Coder.Context context)
- throws CoderException {
- if (threadLocalOutputStreamInUse.get()) {
- // encodeToByteArray() is called recursively and the thread local stream is in use,
- // allocating a new one.
- ByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
- encodeToSafeStream(coder, value, stream, context);
- return stream.toByteArray();
- } else {
- threadLocalOutputStreamInUse.set(true);
- try {
- ByteArrayOutputStream stream = getThreadLocalOutputStream();
- encodeToSafeStream(coder, value, stream, context);
- return stream.toByteArray();
- } finally {
- threadLocalOutputStreamInUse.set(false);
- }
- }
- }
-
- /**
- * Encodes {@code value} to the given {@code stream}, which should be a stream that never throws
- * {@code IOException}, such as {@code ByteArrayOutputStream} or
- * {@link ExposedByteArrayOutputStream}.
- */
- private static <T> void encodeToSafeStream(
- Coder<T> coder, T value, OutputStream stream, Coder.Context context) throws CoderException {
- try {
- coder.encode(value, new UnownedOutputStream(stream), context);
- } catch (IOException exn) {
- Throwables.propagateIfPossible(exn, CoderException.class);
- throw new IllegalArgumentException(
- "Forbidden IOException when writing to OutputStream", exn);
- }
- }
-
- /**
- * Decodes the given bytes using the specified Coder, and returns
- * the resulting decoded value.
- */
- public static <T> T decodeFromByteArray(Coder<T> coder, byte[] encodedValue)
- throws CoderException {
- return decodeFromByteArray(coder, encodedValue, Coder.Context.OUTER);
- }
-
- public static <T> T decodeFromByteArray(
- Coder<T> coder, byte[] encodedValue, Coder.Context context) throws CoderException {
- try (ExposedByteArrayInputStream stream = new ExposedByteArrayInputStream(encodedValue)) {
- T result = decodeFromSafeStream(coder, stream, context);
- if (stream.available() != 0) {
- throw new CoderException(
- stream.available() + " unexpected extra bytes after decoding " + result);
- }
- return result;
- }
- }
-
- /**
- * Decodes a value from the given {@code stream}, which should be a stream that never throws
- * {@code IOException}, such as {@code ByteArrayInputStream} or
- * {@link ExposedByteArrayInputStream}.
- */
- private static <T> T decodeFromSafeStream(
- Coder<T> coder, InputStream stream, Coder.Context context) throws CoderException {
- try {
- return coder.decode(new UnownedInputStream(stream), context);
- } catch (IOException exn) {
- Throwables.propagateIfPossible(exn, CoderException.class);
- throw new IllegalArgumentException(
- "Forbidden IOException when reading from InputStream", exn);
- }
- }
-
- private static ByteArrayOutputStream getThreadLocalOutputStream() {
- SoftReference<ExposedByteArrayOutputStream> refStream = threadLocalOutputStream.get();
- ExposedByteArrayOutputStream stream = refStream == null ? null : refStream.get();
- if (stream == null) {
- stream = new ExposedByteArrayOutputStream();
- threadLocalOutputStream.set(new SoftReference<>(stream));
- }
- stream.reset();
- return stream;
- }
-
- /**
- * Clones the given value by encoding and then decoding it with the specified Coder.
- *
- * <p>This function is not reentrant; it should not be called from methods of the provided
- * {@link Coder}.
- */
- public static <T> T clone(Coder<T> coder, T value) throws CoderException {
- return decodeFromByteArray(coder, encodeToByteArray(coder, value, Coder.Context.OUTER));
- }
-
- /**
- * Encodes the given value using the specified Coder, and returns the Base64 encoding of the
- * encoded bytes.
- *
- * @throws CoderException if there are errors during encoding.
- */
- public static <T> String encodeToBase64(Coder<T> coder, T value)
- throws CoderException {
- byte[] rawValue = encodeToByteArray(coder, value);
- return Base64.encodeBase64URLSafeString(rawValue);
- }
-
- /**
- * Parses a value from a base64-encoded String using the given coder.
- */
- public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException {
- return decodeFromSafeStream(
- coder, new ByteArrayInputStream(Base64.decodeBase64(encodedValue)), Coder.Context.OUTER);
- }
-
- /**
- * If {@code coderType} is a subclass of {@code Coder<T>} for a specific
- * type {@code T}, returns {@code T.class}.
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- public static TypeDescriptor getCodedType(TypeDescriptor coderDescriptor) {
- ParameterizedType coderType =
- (ParameterizedType) coderDescriptor.getSupertype(Coder.class).getType();
- TypeDescriptor codedType = TypeDescriptor.of(coderType.getActualTypeArguments()[0]);
- return codedType;
- }
-
- public static CloudObject makeCloudEncoding(
- String type,
- CloudObject... componentSpecs) {
- CloudObject encoding = CloudObject.forClassName(type);
- if (componentSpecs.length > 0) {
- addList(encoding, PropertyNames.COMPONENT_ENCODINGS, componentSpecs);
- }
- return encoding;
- }
-
- /**
- * A {@link com.fasterxml.jackson.databind.Module} that adds the type
- * resolver needed for Coder definitions created by the Dataflow service.
- */
- static final class Jackson2Module extends SimpleModule {
- /**
- * The Coder custom type resolver.
- *
- * <p>This resolver resolves coders. If the Coder ID is a particular
- * well-known identifier supplied by the Dataflow service, it's replaced
- * with the corresponding class. All other Coder instances are resolved
- * by class name, using the package com.google.cloud.dataflow.sdk.coders
- * if there are no "."s in the ID.
- */
- private static final class Resolver extends TypeIdResolverBase {
- @SuppressWarnings("unused") // Used via @JsonTypeIdResolver annotation on Mixin
- public Resolver() {
- super(TypeFactory.defaultInstance().constructType(Coder.class),
- TypeFactory.defaultInstance());
- }
-
- @Deprecated
- @Override
- public JavaType typeFromId(String id) {
- return typeFromId(null, id);
- }
-
- @Override
- public JavaType typeFromId(DatabindContext context, String id) {
- Class<?> clazz = getClassForId(id);
- if (clazz == KvCoder.class) {
- clazz = KvCoderBase.class;
- }
- if (clazz == MapCoder.class) {
- clazz = MapCoderBase.class;
- }
- @SuppressWarnings("rawtypes")
- TypeVariable[] tvs = clazz.getTypeParameters();
- JavaType[] types = new JavaType[tvs.length];
- for (int lupe = 0; lupe < tvs.length; lupe++) {
- types[lupe] = TypeFactory.unknownType();
- }
- return _typeFactory.constructSimpleType(clazz, types);
- }
-
- private Class<?> getClassForId(String id) {
- try {
- if (id.contains(".")) {
- return Class.forName(id);
- }
-
- if (id.equals(KIND_STREAM)) {
- return IterableCoder.class;
- } else if (id.equals(KIND_PAIR)) {
- return KvCoder.class;
- }
-
- // Otherwise, see if the ID is the name of a class in
- // com.google.cloud.dataflow.sdk.coders. We do this via creating
- // the class object so that class loaders have a chance to get
- // involved -- and since we need the class object anyway.
- return Class.forName(Coder.class.getPackage().getName() + "." + id);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to convert coder ID " + id + " to class", e);
- }
- }
-
- @Override
- public String idFromValueAndType(Object o, Class<?> clazz) {
- return clazz.getName();
- }
-
- @Override
- public String idFromValue(Object o) {
- return o.getClass().getName();
- }
-
- @Override
- public JsonTypeInfo.Id getMechanism() {
- return JsonTypeInfo.Id.CUSTOM;
- }
- }
-
- /**
- * The mixin class defining how Coders are handled by the deserialization
- * {@link ObjectMapper}.
- *
- * <p>This is done via a mixin so that this resolver is <i>only</i> used
- * during deserialization requested by the Dataflow SDK.
- */
- @JsonTypeIdResolver(Resolver.class)
- @JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = PropertyNames.OBJECT_TYPE_NAME)
- private static final class Mixin {}
-
- public Jackson2Module() {
- super("DataflowCoders");
- setMixInAnnotation(Coder.class, Mixin.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
deleted file mode 100644
index 6f2b89b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineContextFactory.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * Factory that produces {@code Combine.Context} based on different inputs.
- */
-public class CombineContextFactory {
-
- private static final Context NULL_CONTEXT = new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context");
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- throw new IllegalArgumentException("cannot call sideInput() in a null context");
- }
- };
-
- /**
- * Returns a fake {@code Combine.Context} for tests.
- */
- public static Context nullContext() {
- return NULL_CONTEXT;
- }
-
- /**
- * Returns a {@code Combine.Context} that wraps a {@code DoFn.ProcessContext}.
- */
- public static Context createFromProcessContext(final DoFn<?, ?>.ProcessContext c) {
- return new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return c.sideInput(view);
- }
- };
- }
-
- /**
- * Returns a {@code Combine.Context} that wraps a {@link StateContext}.
- */
- public static Context createFromStateContext(final StateContext<?> c) {
- return new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return c.sideInput(view);
- }
- };
- }
-
- /**
- * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader},
- * and the main input window.
- */
- public static Context createFromComponents(final PipelineOptions options,
- final SideInputReader sideInputReader, final BoundedWindow mainInputWindow) {
- return new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
-
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
- return sideInputReader.get(view, sideInputWindow);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
deleted file mode 100644
index d974480..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java
+++ /dev/null
@@ -1,154 +0,0 @@
-
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-
-import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.ObjectOutputStream;
-
-/**
- * Static utility methods that create combine function instances.
- */
-public class CombineFnUtil {
- /**
- * Returns the partial application of the {@link KeyedCombineFnWithContext} to a specific
- * context to produce a {@link KeyedCombineFn}.
- *
- * <p>The returned {@link KeyedCombineFn} cannot be serialized.
- */
- public static <K, InputT, AccumT, OutputT> KeyedCombineFn<K, InputT, AccumT, OutputT>
- bindContext(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
- StateContext<?> stateContext) {
- Context context = CombineContextFactory.createFromStateContext(stateContext);
- return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context);
- }
-
- /**
- * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}.
- */
- public static <InputT, AccumT, OutputT>
- CombineFnWithContext<InputT, AccumT, OutputT> toFnWithContext(
- GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
- if (globalCombineFn instanceof CombineFnWithContext) {
- @SuppressWarnings("unchecked")
- CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext =
- (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
- return combineFnWithContext;
- } else {
- @SuppressWarnings("unchecked")
- final CombineFn<InputT, AccumT, OutputT> combineFn =
- (CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
- return new CombineFnWithContext<InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(Context c) {
- return combineFn.createAccumulator();
- }
- @Override
- public AccumT addInput(AccumT accumulator, InputT input, Context c) {
- return combineFn.addInput(accumulator, input);
- }
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
- return combineFn.mergeAccumulators(accumulators);
- }
- @Override
- public OutputT extractOutput(AccumT accumulator, Context c) {
- return combineFn.extractOutput(accumulator);
- }
- @Override
- public AccumT compact(AccumT accumulator, Context c) {
- return combineFn.compact(accumulator);
- }
- @Override
- public OutputT defaultValue() {
- return combineFn.defaultValue();
- }
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return combineFn.getAccumulatorCoder(registry, inputCoder);
- }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getDefaultOutputCoder(registry, inputCoder);
- }
- };
- }
- }
-
- private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
- extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
- private final Context context;
-
- private NonSerializableBoundedKeyedCombineFn(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
- Context context) {
- this.combineFn = combineFn;
- this.context = context;
- }
- @Override
- public AccumT createAccumulator(K key) {
- return combineFn.createAccumulator(key, context);
- }
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value) {
- return combineFn.addInput(key, accumulator, value, context);
- }
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators, context);
- }
- @Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return combineFn.extractOutput(key, accumulator, context);
- }
- @Override
- public AccumT compact(K key, AccumT accumulator) {
- return combineFn.compact(key, accumulator, context);
- }
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
- }
-
- private void writeObject(@SuppressWarnings("unused") ObjectOutputStream out)
- throws IOException {
- throw new NotSerializableException(
- "Cannot serialize the CombineFn resulting from CombineFnUtil.bindContext.");
- }
- }
-}