You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/11 14:07:34 UTC
[1/5] incubator-beam git commit: Move Datastore from sdks/java/core
to io/gcp
Repository: incubator-beam
Updated Branches:
refs/heads/master 9d7002545 -> cf874d426
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
new file mode 100644
index 0000000..0ba4433
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -0,0 +1,992 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Sink.WriteOperation;
+import org.apache.beam.sdk.io.Sink.Writer;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import com.google.datastore.v1beta3.CommitRequest;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreException;
+import com.google.datastore.v1beta3.client.DatastoreFactory;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.DatastoreOptions;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
+ * {@link Entity} objects.
+ *
+ * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
+ * must use the {@code gcloud} command line tool to get credentials for Datastore:
+ * <pre>
+ * $ gcloud auth login
+ * </pre>
+ *
+ * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
+ * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link V1Beta3.Read#withNamespace}.
+ *
+ * <p>For example:
+ *
+ * <pre> {@code
+ * // Read a query from Datastore
+ * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+ * Query query = ...;
+ * String projectId = "...";
+ *
+ * Pipeline p = Pipeline.create(options);
+ * PCollection<Entity> entities = p.apply(
+ * DatastoreIO.v1beta3().read()
+ * .withProjectId(projectId)
+ * .withQuery(query));
+ * } </pre>
+ *
+ * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
+ * many workers. However, when the {@link Query} is configured with a limit using
+ * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
+ * all returned results will be read by a single Dataflow worker in order to ensure correct data.
+ *
+ * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
+ * specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
+ * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
+ * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
+ * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
+ *
+ * <pre>{@code
+ * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
+ * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ * }</pre>
+ *
+ * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
+ * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
+ * Keys</a> for more information about {@code Entity} keys.
+ *
+ * <p><h3>Permissions</h3>
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
+ * more details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
+ * </a>for security and permission related information specific to Datastore.
+ *
+ * @see org.apache.beam.sdk.runners.PipelineRunner
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class V1Beta3 {
+
+ // A package-private constructor to prevent direct instantiation from outside of this package
+ V1Beta3() {}
+
+ /**
+ * Datastore has a limit of 500 mutations per batch operation, so we flush
+ * changes to Datastore every 500 entities.
+ */
+ private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+
+ /**
+ * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
+ * {@code query}, and optionally {@code namespace} using {@link V1Beta3.Read#withProjectId},
+ * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}.
+ */
+ public V1Beta3.Read read() {
+ return new V1Beta3.Read(null, null, null);
+ }
+
+ /**
+ * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
+ * objects.
+ *
+ * @see DatastoreIO
+ */
+ public static class Read extends PTransform<PBegin, PCollection<Entity>> {
+ @Nullable
+ private final String projectId;
+
+ @Nullable
+ private final Query query;
+
+ @Nullable
+ private final String namespace;
+
+ /**
+ * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
+ * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
+ * an error will be thrown.
+ */
+ private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace) {
+ this.projectId = projectId;
+ this.query = query;
+ this.namespace = namespace;
+ }
+
+ /**
+ * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
+ */
+ public V1Beta3.Read withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new V1Beta3.Read(projectId, query, namespace);
+ }
+
+ /**
+ * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
+ *
+ * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
+ * across many workers. However, when the {@link Query} is configured with a limit using
+ * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
+ * to ensure correct results.
+ */
+ public V1Beta3.Read withQuery(Query query) {
+ checkNotNull(query, "query");
+ checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+ "Invalid query limit %s: must be positive", query.getLimit().getValue());
+ return new V1Beta3.Read(projectId, query, namespace);
+ }
+
+ /**
+ * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
+ */
+ public V1Beta3.Read withNamespace(String namespace) {
+ return new V1Beta3.Read(projectId, query, namespace);
+ }
+
+ @Nullable
+ public Query getQuery() {
+ return query;
+ }
+
+ @Nullable
+ public String getProjectId() {
+ return projectId;
+ }
+
+ @Nullable
+ public String getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public PCollection<Entity> apply(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkNotNull(projectId, "projectId");
+ checkNotNull(query, "query");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("ProjectId"))
+ .addIfNotNull(DisplayData.item("namespace", namespace)
+ .withLabel("Namespace"))
+ .addIfNotNull(DisplayData.item("query", query.toString())
+ .withLabel("Query"));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("projectId", projectId)
+ .add("query", query)
+ .add("namespace", namespace)
+ .toString();
+ }
+
+ @VisibleForTesting
+ DatastoreSource getSource() {
+ return new DatastoreSource(projectId, query, namespace);
+ }
+ }
+
+ /**
+ * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
+ * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
+ */
+ public Write write() {
+ return new Write(null);
+ }
+
+ /**
+ * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class Write extends PTransform<PCollection<Entity>, PDone> {
+ @Nullable
+ private final String projectId;
+
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ public Write(@Nullable String projectId) {
+ this.projectId = projectId;
+ }
+
+ /**
+ * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
+ */
+ public Write withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new Write(projectId);
+ }
+
+ @Override
+ public PDone apply(PCollection<Entity> input) {
+ return input.apply(
+ org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
+ }
+
+ @Override
+ public void validate(PCollection<Entity> input) {
+ checkNotNull(projectId, "projectId");
+ }
+
+ @Nullable
+ public String getProjectId() {
+ return projectId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("projectId", projectId)
+ .toString();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"));
+ }
+ }
+
+ /**
+ * A {@link org.apache.beam.sdk.io.Source} that reads data from Datastore.
+ */
+ static class DatastoreSource extends BoundedSource<Entity> {
+
+ @Override
+ public Coder<Entity> getDefaultOutputCoder() {
+ return ProtoCoder.of(Entity.class);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) {
+ return false;
+ }
+
+ @Override
+ public List<DatastoreSource> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ // Users may request a limit on the number of results. We can currently support this by
+ // simply disabling parallel reads and using only a single split.
+ if (query.hasLimit()) {
+ return ImmutableList.of(this);
+ }
+
+ long numSplits;
+ try {
+ numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
+ } catch (Exception e) {
+ // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.
+ numSplits = 12;
+ }
+
+ // If the desiredBundleSize or number of workers results in 1 split, simply return
+ // a source that reads from the original query.
+ if (numSplits <= 1) {
+ return ImmutableList.of(this);
+ }
+
+ List<Query> datastoreSplits;
+ try {
+ datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
+ } catch (IllegalArgumentException | DatastoreException e) {
+ LOG.warn("Unable to parallelize the given query: {}", query, e);
+ return ImmutableList.of(this);
+ }
+
+ ImmutableList.Builder<DatastoreSource> splits = ImmutableList.builder();
+ for (Query splitQuery : datastoreSplits) {
+ splits.add(new DatastoreSource(projectId, splitQuery, namespace));
+ }
+ return splits.build();
+ }
+
+ @Override
+ public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
+ return new DatastoreReader(this, getDatastore(pipelineOptions));
+ }
+
+ @Override
+ public void validate() {
+ checkNotNull(query, "query");
+ checkNotNull(projectId, "projectId");
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ // Datastore provides no way to get a good estimate of how large the result of a query
+ // will be. As a rough approximation, we attempt to fetch the statistics of the whole
+ // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
+ // is specified in the query.
+ //
+ // See https://cloud.google.com/datastore/docs/concepts/stats
+ if (mockEstimateSizeBytes != null) {
+ return mockEstimateSizeBytes;
+ }
+
+ Datastore datastore = getDatastore(options);
+ if (query.getKindCount() != 1) {
+ throw new UnsupportedOperationException(
+ "Can only estimate size for queries specifying exactly 1 kind.");
+ }
+ String ourKind = query.getKind(0).getName();
+ long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
+ Query.Builder query = Query.newBuilder();
+ if (namespace == null) {
+ query.addKindBuilder().setName("__Stat_Kind__");
+ } else {
+ query.addKindBuilder().setName("__Ns_Stat_Kind__");
+ }
+ query.setFilter(makeAndFilter(
+ makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
+ makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
+ RunQueryRequest request = makeRequest(query.build());
+
+ long now = System.currentTimeMillis();
+ RunQueryResponse response = datastore.runQuery(request);
+ LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
+
+ QueryResultBatch batch = response.getBatch();
+ if (batch.getEntityResultsCount() == 0) {
+ throw new NoSuchElementException(
+ "Datastore statistics for kind " + ourKind + " unavailable");
+ }
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("entity_bytes").getIntegerValue();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("ProjectId"))
+ .addIfNotNull(DisplayData.item("namespace", namespace)
+ .withLabel("Namespace"))
+ .addIfNotNull(DisplayData.item("query", query.toString())
+ .withLabel("Query"));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("projectId", projectId)
+ .add("query", query)
+ .add("namespace", namespace)
+ .toString();
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
+ private final String projectId;
+ private final Query query;
+ @Nullable
+ private final String namespace;
+
+ /** For testing only. TODO: This could be much cleaner with dependency injection. */
+ @Nullable
+ private QuerySplitter mockSplitter;
+ @Nullable
+ private Long mockEstimateSizeBytes;
+
+ DatastoreSource(String projectId, Query query, @Nullable String namespace) {
+ this.projectId = projectId;
+ this.query = query;
+ this.namespace = namespace;
+ }
+
+ /**
+ * A helper function to get the split queries, taking into account the optional
+ * {@code namespace} and whether there is a mock splitter.
+ */
+ private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
+ throws DatastoreException {
+ // If namespace is set, include it in the split request so splits are calculated accordingly.
+ PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+ if (namespace != null) {
+ partitionBuilder.setNamespaceId(namespace);
+ }
+
+ if (mockSplitter != null) {
+ // For testing.
+ return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null);
+ }
+
+ return DatastoreHelper.getQuerySplitter().getSplits(
+ query, partitionBuilder.build(), numSplits, getDatastore(options));
+ }
+
+ /**
+ * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this
+ * {@code DatastoreSource}. For example, sets the {@code namespace} for the request.
+ */
+ private RunQueryRequest makeRequest(Query query) {
+ RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+ if (namespace != null) {
+ requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * Datastore system tables with statistics are periodically updated. This method fetches
+ * the latest timestamp of statistics update using the {@code __Stat_Total__} table.
+ */
+ private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
+ Query.Builder query = Query.newBuilder();
+ query.addKindBuilder().setName("__Stat_Total__");
+ query.addOrder(makeOrder("timestamp", DESCENDING));
+ query.setLimit(Int32Value.newBuilder().setValue(1));
+ RunQueryRequest request = makeRequest(query.build());
+
+ long now = System.currentTimeMillis();
+ RunQueryResponse response = datastore.runQuery(request);
+ LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
+ System.currentTimeMillis() - now);
+ QueryResultBatch batch = response.getBatch();
+ if (batch.getEntityResultsCount() == 0) {
+ throw new NoSuchElementException(
+ "Datastore total statistics for project " + projectId + " unavailable");
+ }
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
+ }
+
+ private Datastore getDatastore(PipelineOptions pipelineOptions) {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
+
+ Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
+ }
+ return DatastoreFactory.get().create(builder.build());
+ }
+
+ /** For testing only. */
+ DatastoreSource withMockSplitter(QuerySplitter splitter) {
+ DatastoreSource res = new DatastoreSource(projectId, query, namespace);
+ res.mockSplitter = splitter;
+ res.mockEstimateSizeBytes = mockEstimateSizeBytes;
+ return res;
+ }
+
+ /** For testing only. */
+ DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) {
+ DatastoreSource res = new DatastoreSource(projectId, query, namespace);
+ res.mockSplitter = mockSplitter;
+ res.mockEstimateSizeBytes = estimateSizeBytes;
+ return res;
+ }
+
+ @VisibleForTesting
+ Query getQuery() {
+ return query;
+ }
+ }
+
+ /**
+ * A {@link DatastoreSource.Reader} over the records from a query of the datastore.
+ *
+ * <p>Timestamped records are currently not supported.
+ * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
+ */
+ @VisibleForTesting
+ static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
+ private final DatastoreSource source;
+
+ /**
+ * Datastore to read from.
+ */
+ private final Datastore datastore;
+
+ /**
+ * True if more results may be available.
+ */
+ private boolean moreResults;
+
+ /**
+ * Iterator over records.
+ */
+ private java.util.Iterator<EntityResult> entities;
+
+ /**
+ * Current batch of query results.
+ */
+ private QueryResultBatch currentBatch;
+
+ /**
+ * Maximum number of results to request per query.
+ *
+ * <p>Must be set, or it may result in an I/O error when querying
+ * Cloud Datastore.
+ */
+ private static final int QUERY_BATCH_LIMIT = 500;
+
+ /**
+ * Remaining user-requested limit on the number of sources to return. If the user did not set a
+ * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never
+ * be decremented.
+ */
+ private int userLimit;
+
+ private volatile boolean done = false;
+
+ private Entity currentEntity;
+
+ /**
+ * Returns a DatastoreReader with DatastoreSource and Datastore object set.
+ *
+ * @param datastore a datastore connection to use.
+ */
+ public DatastoreReader(DatastoreSource source, Datastore datastore) {
+ this.source = source;
+ this.datastore = datastore;
+ // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
+ userLimit = source.query.hasLimit()
+ ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
+ }
+
+ @Override
+ public Entity getCurrent() {
+ return currentEntity;
+ }
+
+ @Override
+ public final long getSplitPointsConsumed() {
+ return done ? 1 : 0;
+ }
+
+ @Override
+ public final long getSplitPointsRemaining() {
+ return done ? 0 : 1;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (entities == null || (!entities.hasNext() && moreResults)) {
+ try {
+ entities = getIteratorAndMoveCursor();
+ } catch (DatastoreException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (entities == null || !entities.hasNext()) {
+ currentEntity = null;
+ done = true;
+ return false;
+ }
+
+ currentEntity = entities.next().getEntity();
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing
+ }
+
+ @Override
+ public DatastoreSource getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public DatastoreSource splitAtFraction(double fraction) {
+ // Not supported.
+ return null;
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ // Not supported.
+ return null;
+ }
+
+ /**
+ * Returns an iterator over the next batch of records for the query
+ * and updates the cursor to get the next batch as needed.
+ * Query has specified limit and offset from InputSplit.
+ */
+ private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+ Query.Builder query = source.query.toBuilder().clone();
+ query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
+ if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+ query.setStartCursor(currentBatch.getEndCursor());
+ }
+
+ RunQueryRequest request = source.makeRequest(query.build());
+ RunQueryResponse response = datastore.runQuery(request);
+
+ currentBatch = response.getBatch();
+
+ // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
+ // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
+ // use result count to determine if more results might exist.
+ int numFetch = currentBatch.getEntityResultsCount();
+ if (source.query.hasLimit()) {
+ verify(userLimit >= numFetch,
+ "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
+ userLimit, numFetch, query.getLimit());
+ userLimit -= numFetch;
+ }
+ moreResults =
+ // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied.
+ (userLimit > 0)
+ // All indications from the API are that there are/may be more results.
+ && ((numFetch == QUERY_BATCH_LIMIT)
+ || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+ // May receive a batch of 0 results if the number of records is a multiple
+ // of the request limit.
+ if (numFetch == 0) {
+ return null;
+ }
+
+ return currentBatch.getEntityResultsList().iterator();
+ }
+ }
+
+ /**
+ * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
+ */
+ static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
+ final String projectId;
+
+ public DatastoreSink(String projectId) {
+ this.projectId = projectId;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ checkNotNull(projectId, "projectId");
+ }
+
+ @Override
+ public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
+ return new DatastoreWriteOperation(this);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"));
+ }
+ }
+
+ /**
+ * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
+ */
+ private static class DatastoreWriteOperation
+ extends WriteOperation<Entity, DatastoreWriteResult> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
+
+ private final DatastoreSink sink;
+
+ public DatastoreWriteOperation(DatastoreSink sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public Coder<DatastoreWriteResult> getWriterResultCoder() {
+ return SerializableCoder.of(DatastoreWriteResult.class);
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {}
+
+ /**
+ * Finalizes the write. Logs the number of entities written to the Datastore.
+ */
+ @Override
+ public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
+ throws Exception {
+ long totalEntities = 0;
+ for (DatastoreWriteResult result : writerResults) {
+ totalEntities += result.entitiesWritten;
+ }
+ LOG.info("Wrote {} elements.", totalEntities);
+ }
+
+ @Override
+ public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(sink.projectId)
+ .initializer(new RetryHttpRequestInitializer());
+ Credential credential = options.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
+ }
+ Datastore datastore = DatastoreFactory.get().create(builder.build());
+
+ return new DatastoreWriter(this, datastore);
+ }
+
+ @Override
+ public DatastoreSink getSink() {
+ return sink;
+ }
+ }
+
+ /**
+ * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches,
+ * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. Entities
+ * are committed as upsert mutations (either update if the key already exists, or insert if it is
+ * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle
+ * will fail.
+ *
+ * <p>See <a
+ * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
+ * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
+ * mutations.
+ *
+ * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
+ * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+ * times).
+ *
+ * <p>Visible for testing purposes.
+ */
+ @VisibleForTesting
+ static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
+ private final DatastoreWriteOperation writeOp;
+ private final Datastore datastore;
+ private long totalWritten = 0;
+
+ // Visible for testing.
+ final List<Entity> entities = new ArrayList<>();
+
+ /**
+ * Since a bundle is written in batches, we should retry the commit of a batch in order to
+ * prevent transient errors from causing the bundle to fail.
+ */
+ private static final int MAX_RETRIES = 5;
+
+ /**
+ * Initial backoff time for exponential backoff for retry attempts.
+ */
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+ /**
+ * Returns true if a Datastore key is complete. A key is complete if its last element
+ * has either an id or a name.
+ */
+ static boolean isValidKey(Key key) {
+ List<PathElement> elementList = key.getPathList();
+ if (elementList.isEmpty()) {
+ return false;
+ }
+ PathElement lastElement = elementList.get(elementList.size() - 1);
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ }
+
+ DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
+ this.writeOp = writeOp;
+ this.datastore = datastore;
+ }
+
+ @Override
+ public void open(String uId) throws Exception {}
+
+ /**
+ * Writes an entity to the Datastore. Writes are batched, up to {@link
+ * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
+ * {@link IllegalArgumentException} will be thrown.
+ */
+ @Override
+ public void write(Entity value) throws Exception {
+ // Verify that the entity to write has a complete key.
+ if (!isValidKey(value.getKey())) {
+ throw new IllegalArgumentException(
+ "Entities to be written to the Datastore must have complete keys");
+ }
+
+ entities.add(value);
+
+ if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
+ }
+ }
+
+ /**
+ * Flushes any pending batch writes and returns a DatastoreWriteResult.
+ */
+ @Override
+ public DatastoreWriteResult close() throws Exception {
+ if (entities.size() > 0) {
+ flushBatch();
+ }
+ return new DatastoreWriteResult(totalWritten);
+ }
+
+ @Override
+ public DatastoreWriteOperation getWriteOperation() {
+ return writeOp;
+ }
+
+ /**
+ * Writes a batch of entities to the Datastore.
+ *
+ * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
+ * times). All entities in the batch will be committed again, even if the commit was partially
+ * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+ * thrown.
+ *
+ * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+ * backing off between retries fails.
+ */
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.debug("Writing batch of {} entities", entities.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+ while (true) {
+ // Batch upsert entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ for (Entity entity: entities) {
+ commitRequest.addMutations(makeUpsert(entity));
+ }
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ // Only log the code and message for potentially-transient errors. The entire exception
+ // will be propagated upon the last retry.
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ totalWritten += entities.size();
+ LOG.debug("Successfully wrote {} entities", entities.size());
+ entities.clear();
+ }
+ }
+
+ private static class DatastoreWriteResult implements Serializable {
+ final long entitiesWritten;
+
+ public DatastoreWriteResult(long recordsWritten) {
+ this.entitiesWritten = recordsWritten;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
new file mode 100644
index 0000000..1ca0266
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Provides an API for reading from and writing to
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
+ * versions of the Datastore Client libraries.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
new file mode 100644
index 0000000..a60e7c5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreReader;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreSource;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.KindExpression;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Tests for {@link V1Beta3}.
+ */
+@RunWith(JUnit4.class)
+public class V1Beta3Test {
+ private static final String PROJECT_ID = "testProject";
+ private static final String NAMESPACE = "testNamespace";
+ private static final String KIND = "testKind";
+ private static final Query QUERY;
+ static {
+ Query.Builder q = Query.newBuilder();
+ q.addKindBuilder().setName(KIND);
+ QUERY = q.build();
+ }
+ private V1Beta3.Read initialRead;
+
+ @Mock
+ Datastore mockDatastore;
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ @Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreSource.class);
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ initialRead = DatastoreIO.v1beta3().read()
+ .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+ }
+
+ /**
+ * Helper function to create a test {@code DataflowPipelineOptions}.
+ */
+ static final GcpOptions testPipelineOptions() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setGcpCredential(new TestCredential());
+ return options;
+ }
+
+ @Test
+ public void testBuildRead() throws Exception {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read()
+ .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId());
+ assertEquals(NAMESPACE, read.getNamespace());
+ }
+
+ /**
+ * {@link #testBuildRead} but constructed in a different order.
+ */
+ @Test
+ public void testBuildReadAlt() throws Exception {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read()
+ .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId());
+ assertEquals(NAMESPACE, read.getNamespace());
+ }
+
+ @Test
+ public void testReadValidationFailsProject() throws Exception {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(QUERY);
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("project");
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadValidationFailsQuery() throws Exception {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("query");
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadValidationFailsQueryLimitZero() throws Exception {
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit 0: must be positive");
+
+ DatastoreIO.v1beta3().read().withQuery(invalidLimit);
+ }
+
+ @Test
+ public void testReadValidationFailsQueryLimitNegative() throws Exception {
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit -5: must be positive");
+
+ DatastoreIO.v1beta3().read().withQuery(invalidLimit);
+ }
+
+ @Test
+ public void testReadValidationSucceedsNamespace() throws Exception {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
+ /* Should succeed, as a null namespace is fine. */
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadDisplayData() {
+ V1Beta3.Read read = DatastoreIO.v1beta3().read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+ assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSourcePrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
+ "myProject").withQuery(Query.newBuilder().build());
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+ assertThat("DatastoreIO read should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ }
+
+ @Test
+ public void testWriteDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1beta3().write().withProjectId(null);
+ }
+
+ @Test
+ public void testWriteValidationFailsWithNoProject() throws Exception {
+ V1Beta3.Write write = DatastoreIO.v1beta3().write();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ write.validate(null);
+ }
+
+ @Test
+ public void testSinkValidationSucceedsWithProject() throws Exception {
+ V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+ write.validate(null);
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ V1Beta3.Write write = DatastoreIO.v1beta3().write()
+ .withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSinkPrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1beta3().write().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ }
+
+ @Test
+ public void testQuerySplitBasic() throws Exception {
+ KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+ Query query = Query.newBuilder().addKind(mykind).build();
+
+ List<Query> mockSplits = new ArrayList<>();
+ for (int i = 0; i < 8; ++i) {
+ mockSplits.add(
+ Query.newBuilder()
+ .addKind(mykind)
+ .setFilter(
+ DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL,
+ Value.newBuilder().setIntegerValue(i).build()))
+ .build());
+ }
+
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ /* No namespace */
+ PartitionId partition = PartitionId.newBuilder().build();
+ when(splitter.getSplits(any(Query.class), eq(partition), eq(8), any(Datastore.class)))
+ .thenReturn(mockSplits);
+
+ DatastoreSource io = initialRead
+ .withNamespace(null)
+ .withQuery(query)
+ .getSource()
+ .withMockSplitter(splitter)
+ .withMockEstimateSizeBytes(8 * 1024L);
+
+ List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
+ assertEquals(8, bundles.size());
+ for (int i = 0; i < 8; ++i) {
+ DatastoreSource bundle = bundles.get(i);
+ Query bundleQuery = bundle.getQuery();
+ assertEquals("mykind", bundleQuery.getKind(0).getName());
+ assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue());
+ }
+ }
+
+ /**
+ * Verifies that when namespace is set in the source, the split request includes the namespace.
+ */
+ @Test
+ public void testSourceWithNamespace() throws Exception {
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ DatastoreSource io = initialRead
+ .getSource()
+ .withMockSplitter(splitter)
+ .withMockEstimateSizeBytes(8 * 1024L);
+
+ io.splitIntoBundles(1024, testPipelineOptions());
+
+ PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
+ verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
+ verifyNoMoreInteractions(splitter);
+ }
+
+ @Test
+ public void testQuerySplitWithZeroSize() throws Exception {
+ KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+ Query query = Query.newBuilder().addKind(mykind).build();
+
+ List<Query> mockSplits = Lists.newArrayList(
+ Query.newBuilder()
+ .addKind(mykind)
+ .build());
+
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)))
+ .thenReturn(mockSplits);
+
+ DatastoreSource io = initialRead
+ .withQuery(query)
+ .getSource()
+ .withMockSplitter(splitter)
+ .withMockEstimateSizeBytes(0L);
+
+ List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
+ assertEquals(1, bundles.size());
+ verify(splitter, never())
+ .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
+ DatastoreSource bundle = bundles.get(0);
+ Query bundleQuery = bundle.getQuery();
+ assertEquals("mykind", bundleQuery.getKind(0).getName());
+ assertFalse(bundleQuery.hasFilter());
+ }
+
+ /**
+ * Tests that a query with a user-provided limit field does not split, and does not even
+ * interact with a query splitter.
+ */
+ @Test
+ public void testQueryDoesNotSplitWithLimitSet() throws Exception {
+ // Minimal query with a limit
+ Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
+
+ // Mock query splitter, should not be invoked.
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class)))
+ .thenThrow(new AssertionError("Splitter should not be invoked"));
+
+ List<DatastoreSource> bundles =
+ initialRead
+ .withQuery(query)
+ .getSource()
+ .withMockSplitter(splitter)
+ .splitIntoBundles(1024, testPipelineOptions());
+
+ assertEquals(1, bundles.size());
+ assertEquals(query, bundles.get(0).getQuery());
+ verifyNoMoreInteractions(splitter);
+ }
+
+ /**
+ * Tests that when {@link QuerySplitter} cannot split a query, {@link V1Beta3} falls back to
+ * a single split.
+ */
+ @Test
+ public void testQuerySplitterThrows() throws Exception {
+ // Mock query splitter that throws IllegalArgumentException
+ IllegalArgumentException exception =
+ new IllegalArgumentException("query not supported by splitter");
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ when(
+ splitter.getSplits(
+ any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)))
+ .thenThrow(exception);
+
+ Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
+ List<DatastoreSource> bundles =
+ initialRead
+ .withQuery(query)
+ .getSource()
+ .withMockSplitter(splitter)
+ .withMockEstimateSizeBytes(10240L)
+ .splitIntoBundles(1024, testPipelineOptions());
+
+ assertEquals(1, bundles.size());
+ assertEquals(query, bundles.get(0).getQuery());
+ verify(splitter, times(1))
+ .getSplits(
+ any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class));
+ logged.verifyWarn("Unable to parallelize the given query", exception);
+ }
+
+ @Test
+ public void testQuerySplitSizeUnavailable() throws Exception {
+ KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+ Query query = Query.newBuilder().addKind(mykind).build();
+
+ List<Query> mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build());
+
+ QuerySplitter splitter = mock(QuerySplitter.class);
+ when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), any(Datastore.class)))
+ .thenReturn(mockSplits);
+
+ DatastoreSource io = initialRead
+ .withQuery(query)
+ .getSource()
+ .withMockSplitter(splitter)
+ .withMockEstimateSizeBytes(8 * 1024L);
+
+ DatastoreSource spiedIo = spy(io);
+ when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class)))
+ .thenThrow(new NoSuchElementException());
+
+ List<DatastoreSource> bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions());
+ assertEquals(1, bundles.size());
+ verify(splitter, never())
+ .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
+ DatastoreSource bundle = bundles.get(0);
+ Query bundleQuery = bundle.getQuery();
+ assertEquals("mykind", bundleQuery.getKind(0).getName());
+ assertFalse(bundleQuery.hasFilter());
+ }
+
+ /**
+ * Test building a Write using builder methods.
+ */
+ @Test
+ public void testBuildWrite() throws Exception {
+ V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+ assertEquals(PROJECT_ID, write.getProjectId());
+ }
+
+ /**
+ * Test the detection of complete and incomplete keys.
+ */
+ @Test
+ public void testHasNameOrId() {
+ Key key;
+ // Complete with name, no ancestor
+ key = makeKey("bird", "finch").build();
+ assertTrue(DatastoreWriter.isValidKey(key));
+
+ // Complete with id, no ancestor
+ key = makeKey("bird", 123).build();
+ assertTrue(DatastoreWriter.isValidKey(key));
+
+ // Incomplete, no ancestor
+ key = makeKey("bird").build();
+ assertFalse(DatastoreWriter.isValidKey(key));
+
+ // Complete with name and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", "horned").build();
+ assertTrue(DatastoreWriter.isValidKey(key));
+
+ // Complete with id and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", 123).build();
+ assertTrue(DatastoreWriter.isValidKey(key));
+
+ // Incomplete with ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird").build();
+ assertFalse(DatastoreWriter.isValidKey(key));
+
+ key = makeKey().build();
+ assertFalse(DatastoreWriter.isValidKey(key));
+ }
+
+ /**
+ * Test that entities with incomplete keys cannot be updated.
+ */
+ @Test
+ public void testAddEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
+
+ writer.write(entity);
+ }
+
+ /**
+ * Test that entities are added to the batch to update.
+ */
+ @Test
+ public void testAddingEntities() throws Exception {
+ List<Entity> expected = Lists.newArrayList(
+ Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
+ Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
+ Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
+
+ List<Entity> allEntities = Lists.newArrayList(expected);
+ Collections.shuffle(allEntities);
+
+ DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+ writer.open("test_id");
+ for (Entity entity : allEntities) {
+ writer.write(entity);
+ }
+
+ assertEquals(expected.size(), writer.entities.size());
+ assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
+ }
+
+ /** Datastore batch API limit in number of records per query. */
+ private static final int DATASTORE_QUERY_BATCH_LIMIT = 500;
+
+ /**
+ * A helper function that creates mock {@link Entity} results in response to a query. Always
+ * indicates that more results are available, unless the batch is limited to fewer than
+ * {@link #DATASTORE_QUERY_BATCH_LIMIT} results.
+ */
+ private static RunQueryResponse mockResponseForQuery(Query q) {
+ // Every query V1Beta3 sends should have a limit.
+ assertTrue(q.hasLimit());
+
+ // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
+ int limit = q.getLimit().getValue();
+ assertThat(limit, greaterThanOrEqualTo(1));
+ assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
+
+ // Create the requested number of entities.
+ List<EntityResult> entities = new ArrayList<>(limit);
+ for (int i = 0; i < limit; ++i) {
+ entities.add(
+ EntityResult.newBuilder()
+ .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
+ .build());
+ }
+
+ // Fill out the other parameters on the returned result batch.
+ RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
+ ret.getBatchBuilder()
+ .addAllEntityResults(entities)
+ .setEntityResultType(EntityResult.ResultType.FULL)
+ .setMoreResults(
+ limit == DATASTORE_QUERY_BATCH_LIMIT
+ ? QueryResultBatch.MoreResultsType.NOT_FINISHED
+ : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
+
+ return ret.build();
+ }
+
+ /** Helper function to run a test reading from a limited-result query. */
+ private void runQueryLimitReadTest(int numEntities) throws Exception {
+ // An empty query to read entities.
+ Query query = Query.newBuilder().setLimit(
+ Int32Value.newBuilder().setValue(numEntities)).build();
+ V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject");
+
+ // Use mockResponseForQuery to generate results.
+ when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+ .thenAnswer(
+ new Answer<RunQueryResponse>() {
+ @Override
+ public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable {
+ Query q = ((RunQueryRequest) invocation.getArguments()[0]).getQuery();
+ return mockResponseForQuery(q);
+ }
+ });
+
+ // Actually instantiate the reader.
+ DatastoreReader reader = new DatastoreReader(read.getSource(), mockDatastore);
+
+ // Simply count the number of results returned by the reader.
+ assertTrue(reader.start());
+ int resultCount = 1;
+ while (reader.advance()) {
+ resultCount++;
+ }
+ reader.close();
+
+ // Validate the number of results.
+ assertEquals(numEntities, resultCount);
+ }
+
+ /** Tests reading with a query limit less than one batch. */
+ @Test
+ public void testReadingWithLimitOneBatch() throws Exception {
+ runQueryLimitReadTest(5);
+ }
+
+ /** Tests reading with a query limit more than one batch, and not a multiple. */
+ @Test
+ public void testReadingWithLimitMultipleBatches() throws Exception {
+ runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5);
+ }
+
+ /** Tests reading several batches, using an exact multiple of batch size results. */
+ @Test
+ public void testReadingWithLimitMultipleBatchesExactMultiple() throws Exception {
+ runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT);
+ }
+}
[3/5] incubator-beam git commit: Add io-gcp to top-level pom
dependency management
Posted by lc...@apache.org.
Add io-gcp to top-level pom dependency management
So its version is inherited everywhere that depends on it
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15dff04d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15dff04d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15dff04d
Branch: refs/heads/master
Commit: 15dff04d9fd73b5443b2213ae1dfe8b998a87c74
Parents: 1cccbac
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:10:10 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400
----------------------------------------------------------------------
pom.xml | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15dff04d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 72d73fb..7089c2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -271,6 +271,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>${project.version}</version>
</dependency>
[2/5] incubator-beam git commit: Move Datastore from sdks/java/core
to io/gcp
Posted by lc...@apache.org.
Move Datastore from sdks/java/core to io/gcp
* Remove DataflowDatastoreIOTest. Dataflow runner translation of display
data should be tested via code in the same module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2ccd685
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2ccd685
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2ccd685
Branch: refs/heads/master
Commit: c2ccd685d82aff06ea4a41e84841e653513c4050
Parents: 15dff04
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:10:34 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400
----------------------------------------------------------------------
examples/java/pom.xml | 5 +
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/cookbook/DatastoreWordCount.java | 4 +-
sdks/java/core/pom.xml | 10 -
.../beam/sdk/io/datastore/DatastoreIO.java | 41 -
.../apache/beam/sdk/io/datastore/V1Beta3.java | 992 -------------------
.../beam/sdk/io/datastore/package-info.java | 24 -
.../beam/sdk/io/datastore/V1Beta3Test.java | 617 ------------
sdks/java/io/google-cloud-platform/pom.xml | 28 +
.../beam/sdk/io/gcp/datastore/DatastoreIO.java | 41 +
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 992 +++++++++++++++++++
.../beam/sdk/io/gcp/datastore/package-info.java | 24 +
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 617 ++++++++++++
13 files changed, 1710 insertions(+), 1687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 0d2f505..ca16f51 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -204,6 +204,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 708aa87..e6cc0cc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 36af202..847523b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -25,8 +25,8 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.datastore.V1Beta3;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 9ec8f3d..bda77cb 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -383,16 +383,6 @@
</dependency>
<dependency>
- <groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-proto-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-protos</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
deleted file mode 100644
index d5043f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.datastore;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * <p>{@link DatastoreIO} provides an API for reading from and writing to
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
- * versions of the Datastore Client libraries.
- *
- * <p>To use the v1beta3 version see {@link V1Beta3}.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class DatastoreIO {
-
- private DatastoreIO() {}
-
- /**
- * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
- * of Datastore Client library.
- */
- public static V1Beta3 v1beta3() {
- return new V1Beta3();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
deleted file mode 100644
index 0b9f709..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
+++ /dev/null
@@ -1,992 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Ints;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
- * {@link Entity} objects.
- *
- * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
- * must use the {@code gcloud} command line tool to get credentials for Datastore:
- * <pre>
- * $ gcloud auth login
- * </pre>
- *
- * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
- * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
- * specify the project to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link V1Beta3.Read#withNamespace}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * // Read a query from Datastore
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String projectId = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(
- * DatastoreIO.v1beta3().read()
- * .withProjectId(projectId)
- * .withQuery(query));
- * } </pre>
- *
- * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
- * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct data.
- *
- * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
- * specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
- * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
- * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
- * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
- *
- * <pre>{@code
- * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
- * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- * }</pre>
- *
- * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
- * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
- * Keys</a> for more information about {@code Entity} keys.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
- * </a>for security and permission related information specific to Datastore.
- *
- * @see org.apache.beam.sdk.runners.PipelineRunner
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class V1Beta3 {
-
- // A package-private constructor to prevent direct instantiation from outside of this package
- V1Beta3() {}
-
- /**
- * Datastore has a limit of 500 mutations per batch operation, so we flush
- * changes to Datastore every 500 entities.
- */
- private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-
- /**
- * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
- * {@code query}, and optionally {@code namespace} using {@link V1Beta3.Read#withProjectId},
- * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}.
- */
- public V1Beta3.Read read() {
- return new V1Beta3.Read(null, null, null);
- }
-
- /**
- * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
- * objects.
- *
- * @see DatastoreIO
- */
- public static class Read extends PTransform<PBegin, PCollection<Entity>> {
- @Nullable
- private final String projectId;
-
- @Nullable
- private final Query query;
-
- @Nullable
- private final String namespace;
-
- /**
- * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
- * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
- * an error will be thrown.
- */
- private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace) {
- this.projectId = projectId;
- this.query = query;
- this.namespace = namespace;
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
- */
- public V1Beta3.Read withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new V1Beta3.Read(projectId, query, namespace);
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
- *
- * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
- * across many workers. However, when the {@link Query} is configured with a limit using
- * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
- * to ensure correct results.
- */
- public V1Beta3.Read withQuery(Query query) {
- checkNotNull(query, "query");
- checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
- "Invalid query limit %s: must be positive", query.getLimit().getValue());
- return new V1Beta3.Read(projectId, query, namespace);
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
- */
- public V1Beta3.Read withNamespace(String namespace) {
- return new V1Beta3.Read(projectId, query, namespace);
- }
-
- @Nullable
- public Query getQuery() {
- return query;
- }
-
- @Nullable
- public String getProjectId() {
- return projectId;
- }
-
- @Nullable
- public String getNamespace() {
- return namespace;
- }
-
- @Override
- public PCollection<Entity> apply(PBegin input) {
- return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
- }
-
- @Override
- public void validate(PBegin input) {
- checkNotNull(projectId, "projectId");
- checkNotNull(query, "query");
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("ProjectId"))
- .addIfNotNull(DisplayData.item("namespace", namespace)
- .withLabel("Namespace"))
- .addIfNotNull(DisplayData.item("query", query.toString())
- .withLabel("Query"));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("projectId", projectId)
- .add("query", query)
- .add("namespace", namespace)
- .toString();
- }
-
- @VisibleForTesting
- DatastoreSource getSource() {
- return new DatastoreSource(projectId, query, namespace);
- }
- }
-
- /**
- * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
- * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
- */
- public Write write() {
- return new Write(null);
- }
-
- /**
- * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
- *
- * @see DatastoreIO
- */
- public static class Write extends PTransform<PCollection<Entity>, PDone> {
- @Nullable
- private final String projectId;
-
- /**
- * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
- * it is {@code null} at instantiation time, an error will be thrown.
- */
- public Write(@Nullable String projectId) {
- this.projectId = projectId;
- }
-
- /**
- * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
- */
- public Write withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new Write(projectId);
- }
-
- @Override
- public PDone apply(PCollection<Entity> input) {
- return input.apply(
- org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
- }
-
- @Override
- public void validate(PCollection<Entity> input) {
- checkNotNull(projectId, "projectId");
- }
-
- @Nullable
- public String getProjectId() {
- return projectId;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("projectId", projectId)
- .toString();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
- }
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.Source} that reads data from Datastore.
- */
- static class DatastoreSource extends BoundedSource<Entity> {
-
- @Override
- public Coder<Entity> getDefaultOutputCoder() {
- return ProtoCoder.of(Entity.class);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) {
- return false;
- }
-
- @Override
- public List<DatastoreSource> splitIntoBundles(long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- // Users may request a limit on the number of results. We can currently support this by
- // simply disabling parallel reads and using only a single split.
- if (query.hasLimit()) {
- return ImmutableList.of(this);
- }
-
- long numSplits;
- try {
- numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
- } catch (Exception e) {
- // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.
- numSplits = 12;
- }
-
- // If the desiredBundleSize or number of workers results in 1 split, simply return
- // a source that reads from the original query.
- if (numSplits <= 1) {
- return ImmutableList.of(this);
- }
-
- List<Query> datastoreSplits;
- try {
- datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
- } catch (IllegalArgumentException | DatastoreException e) {
- LOG.warn("Unable to parallelize the given query: {}", query, e);
- return ImmutableList.of(this);
- }
-
- ImmutableList.Builder<DatastoreSource> splits = ImmutableList.builder();
- for (Query splitQuery : datastoreSplits) {
- splits.add(new DatastoreSource(projectId, splitQuery, namespace));
- }
- return splits.build();
- }
-
- @Override
- public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
- return new DatastoreReader(this, getDatastore(pipelineOptions));
- }
-
- @Override
- public void validate() {
- checkNotNull(query, "query");
- checkNotNull(projectId, "projectId");
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- // Datastore provides no way to get a good estimate of how large the result of a query
- // will be. As a rough approximation, we attempt to fetch the statistics of the whole
- // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
- // is specified in the query.
- //
- // See https://cloud.google.com/datastore/docs/concepts/stats
- if (mockEstimateSizeBytes != null) {
- return mockEstimateSizeBytes;
- }
-
- Datastore datastore = getDatastore(options);
- if (query.getKindCount() != 1) {
- throw new UnsupportedOperationException(
- "Can only estimate size for queries specifying exactly 1 kind.");
- }
- String ourKind = query.getKind(0).getName();
- long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
- Query.Builder query = Query.newBuilder();
- if (namespace == null) {
- query.addKindBuilder().setName("__Stat_Kind__");
- } else {
- query.addKindBuilder().setName("__Ns_Stat_Kind__");
- }
- query.setFilter(makeAndFilter(
- makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
- makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
- RunQueryRequest request = makeRequest(query.build());
-
- long now = System.currentTimeMillis();
- RunQueryResponse response = datastore.runQuery(request);
- LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
-
- QueryResultBatch batch = response.getBatch();
- if (batch.getEntityResultsCount() == 0) {
- throw new NoSuchElementException(
- "Datastore statistics for kind " + ourKind + " unavailable");
- }
- Entity entity = batch.getEntityResults(0).getEntity();
- return entity.getProperties().get("entity_bytes").getIntegerValue();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("ProjectId"))
- .addIfNotNull(DisplayData.item("namespace", namespace)
- .withLabel("Namespace"))
- .addIfNotNull(DisplayData.item("query", query.toString())
- .withLabel("Query"));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("projectId", projectId)
- .add("query", query)
- .add("namespace", namespace)
- .toString();
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
- private final String projectId;
- private final Query query;
- @Nullable
- private final String namespace;
-
- /** For testing only. TODO: This could be much cleaner with dependency injection. */
- @Nullable
- private QuerySplitter mockSplitter;
- @Nullable
- private Long mockEstimateSizeBytes;
-
- DatastoreSource(String projectId, Query query, @Nullable String namespace) {
- this.projectId = projectId;
- this.query = query;
- this.namespace = namespace;
- }
-
- /**
- * A helper function to get the split queries, taking into account the optional
- * {@code namespace} and whether there is a mock splitter.
- */
- private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
- throws DatastoreException {
- // If namespace is set, include it in the split request so splits are calculated accordingly.
- PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
- if (namespace != null) {
- partitionBuilder.setNamespaceId(namespace);
- }
-
- if (mockSplitter != null) {
- // For testing.
- return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null);
- }
-
- return DatastoreHelper.getQuerySplitter().getSplits(
- query, partitionBuilder.build(), numSplits, getDatastore(options));
- }
-
- /**
- * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this
- * {@code DatastoreSource}. For example, sets the {@code namespace} for the request.
- */
- private RunQueryRequest makeRequest(Query query) {
- RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
- if (namespace != null) {
- requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return requestBuilder.build();
- }
-
- /**
- * Datastore system tables with statistics are periodically updated. This method fetches
- * the latest timestamp of statistics update using the {@code __Stat_Total__} table.
- */
- private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
- Query.Builder query = Query.newBuilder();
- query.addKindBuilder().setName("__Stat_Total__");
- query.addOrder(makeOrder("timestamp", DESCENDING));
- query.setLimit(Int32Value.newBuilder().setValue(1));
- RunQueryRequest request = makeRequest(query.build());
-
- long now = System.currentTimeMillis();
- RunQueryResponse response = datastore.runQuery(request);
- LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
- System.currentTimeMillis() - now);
- QueryResultBatch batch = response.getBatch();
- if (batch.getEntityResultsCount() == 0) {
- throw new NoSuchElementException(
- "Datastore total statistics for project " + projectId + " unavailable");
- }
- Entity entity = batch.getEntityResults(0).getEntity();
- return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
- }
-
- private Datastore getDatastore(PipelineOptions pipelineOptions) {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(projectId)
- .initializer(
- new RetryHttpRequestInitializer()
- );
-
- Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
- return DatastoreFactory.get().create(builder.build());
- }
-
- /** For testing only. */
- DatastoreSource withMockSplitter(QuerySplitter splitter) {
- DatastoreSource res = new DatastoreSource(projectId, query, namespace);
- res.mockSplitter = splitter;
- res.mockEstimateSizeBytes = mockEstimateSizeBytes;
- return res;
- }
-
- /** For testing only. */
- DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) {
- DatastoreSource res = new DatastoreSource(projectId, query, namespace);
- res.mockSplitter = mockSplitter;
- res.mockEstimateSizeBytes = estimateSizeBytes;
- return res;
- }
-
- @VisibleForTesting
- Query getQuery() {
- return query;
- }
- }
-
- /**
- * A {@link DatastoreSource.Reader} over the records from a query of the datastore.
- *
- * <p>Timestamped records are currently not supported.
- * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
- */
- @VisibleForTesting
- static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
- private final DatastoreSource source;
-
- /**
- * Datastore to read from.
- */
- private final Datastore datastore;
-
- /**
- * True if more results may be available.
- */
- private boolean moreResults;
-
- /**
- * Iterator over records.
- */
- private java.util.Iterator<EntityResult> entities;
-
- /**
- * Current batch of query results.
- */
- private QueryResultBatch currentBatch;
-
- /**
- * Maximum number of results to request per query.
- *
- * <p>Must be set, or it may result in an I/O error when querying
- * Cloud Datastore.
- */
- private static final int QUERY_BATCH_LIMIT = 500;
-
- /**
- * Remaining user-requested limit on the number of sources to return. If the user did not set a
- * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never
- * be decremented.
- */
- private int userLimit;
-
- private volatile boolean done = false;
-
- private Entity currentEntity;
-
- /**
- * Returns a DatastoreReader with DatastoreSource and Datastore object set.
- *
- * @param datastore a datastore connection to use.
- */
- public DatastoreReader(DatastoreSource source, Datastore datastore) {
- this.source = source;
- this.datastore = datastore;
- // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
- userLimit = source.query.hasLimit()
- ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
- }
-
- @Override
- public Entity getCurrent() {
- return currentEntity;
- }
-
- @Override
- public final long getSplitPointsConsumed() {
- return done ? 1 : 0;
- }
-
- @Override
- public final long getSplitPointsRemaining() {
- return done ? 0 : 1;
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (entities == null || (!entities.hasNext() && moreResults)) {
- try {
- entities = getIteratorAndMoveCursor();
- } catch (DatastoreException e) {
- throw new IOException(e);
- }
- }
-
- if (entities == null || !entities.hasNext()) {
- currentEntity = null;
- done = true;
- return false;
- }
-
- currentEntity = entities.next().getEntity();
- return true;
- }
-
- @Override
- public void close() throws IOException {
- // Nothing
- }
-
- @Override
- public DatastoreSource getCurrentSource() {
- return source;
- }
-
- @Override
- public DatastoreSource splitAtFraction(double fraction) {
- // Not supported.
- return null;
- }
-
- @Override
- public Double getFractionConsumed() {
- // Not supported.
- return null;
- }
-
- /**
- * Returns an iterator over the next batch of records for the query
- * and updates the cursor to get the next batch as needed.
- * Query has specified limit and offset from InputSplit.
- */
- private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
- Query.Builder query = source.query.toBuilder().clone();
- query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
- if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
- query.setStartCursor(currentBatch.getEndCursor());
- }
-
- RunQueryRequest request = source.makeRequest(query.build());
- RunQueryResponse response = datastore.runQuery(request);
-
- currentBatch = response.getBatch();
-
- // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
- // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
- // use result count to determine if more results might exist.
- int numFetch = currentBatch.getEntityResultsCount();
- if (source.query.hasLimit()) {
- verify(userLimit >= numFetch,
- "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
- userLimit, numFetch, query.getLimit());
- userLimit -= numFetch;
- }
- moreResults =
- // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied.
- (userLimit > 0)
- // All indications from the API are that there are/may be more results.
- && ((numFetch == QUERY_BATCH_LIMIT)
- || (currentBatch.getMoreResults() == NOT_FINISHED));
-
- // May receive a batch of 0 results if the number of records is a multiple
- // of the request limit.
- if (numFetch == 0) {
- return null;
- }
-
- return currentBatch.getEntityResultsList().iterator();
- }
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
- */
- static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
- final String projectId;
-
- public DatastoreSink(String projectId) {
- this.projectId = projectId;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- checkNotNull(projectId, "projectId");
- }
-
- @Override
- public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
- return new DatastoreWriteOperation(this);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
- }
- }
-
- /**
- * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
- */
- private static class DatastoreWriteOperation
- extends WriteOperation<Entity, DatastoreWriteResult> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
-
- private final DatastoreSink sink;
-
- public DatastoreWriteOperation(DatastoreSink sink) {
- this.sink = sink;
- }
-
- @Override
- public Coder<DatastoreWriteResult> getWriterResultCoder() {
- return SerializableCoder.of(DatastoreWriteResult.class);
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {}
-
- /**
- * Finalizes the write. Logs the number of entities written to the Datastore.
- */
- @Override
- public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
- throws Exception {
- long totalEntities = 0;
- for (DatastoreWriteResult result : writerResults) {
- totalEntities += result.entitiesWritten;
- }
- LOG.info("Wrote {} elements.", totalEntities);
- }
-
- @Override
- public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(sink.projectId)
- .initializer(new RetryHttpRequestInitializer());
- Credential credential = options.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
- Datastore datastore = DatastoreFactory.get().create(builder.build());
-
- return new DatastoreWriter(this, datastore);
- }
-
- @Override
- public DatastoreSink getSink() {
- return sink;
- }
- }
-
- /**
- * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches,
- * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. Entities
- * are committed as upsert mutations (either update if the key already exists, or insert if it is
- * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle
- * will fail.
- *
- * <p>See <a
- * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
- * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
- * mutations.
- *
- * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
- * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
- * times).
- *
- * <p>Visible for testing purposes.
- */
- @VisibleForTesting
- static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
- private final DatastoreWriteOperation writeOp;
- private final Datastore datastore;
- private long totalWritten = 0;
-
- // Visible for testing.
- final List<Entity> entities = new ArrayList<>();
-
- /**
- * Since a bundle is written in batches, we should retry the commit of a batch in order to
- * prevent transient errors from causing the bundle to fail.
- */
- private static final int MAX_RETRIES = 5;
-
- /**
- * Initial backoff time for exponential backoff for retry attempts.
- */
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
- /**
- * Returns true if a Datastore key is complete. A key is complete if its last element
- * has either an id or a name.
- */
- static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathList();
- if (elementList.isEmpty()) {
- return false;
- }
- PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
- }
-
- DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
- this.writeOp = writeOp;
- this.datastore = datastore;
- }
-
- @Override
- public void open(String uId) throws Exception {}
-
- /**
- * Writes an entity to the Datastore. Writes are batched, up to {@link
- * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
- * {@link IllegalArgumentException} will be thrown.
- */
- @Override
- public void write(Entity value) throws Exception {
- // Verify that the entity to write has a complete key.
- if (!isValidKey(value.getKey())) {
- throw new IllegalArgumentException(
- "Entities to be written to the Datastore must have complete keys");
- }
-
- entities.add(value);
-
- if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
- }
-
- /**
- * Flushes any pending batch writes and returns a DatastoreWriteResult.
- */
- @Override
- public DatastoreWriteResult close() throws Exception {
- if (entities.size() > 0) {
- flushBatch();
- }
- return new DatastoreWriteResult(totalWritten);
- }
-
- @Override
- public DatastoreWriteOperation getWriteOperation() {
- return writeOp;
- }
-
- /**
- * Writes a batch of entities to the Datastore.
- *
- * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
- * times). All entities in the batch will be committed again, even if the commit was partially
- * successful. If the retry limit is exceeded, the last exception from the Datastore will be
- * thrown.
- *
- * @throws DatastoreException if the commit fails or IOException or InterruptedException if
- * backing off between retries fails.
- */
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.debug("Writing batch of {} entities", entities.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch upsert entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- for (Entity entity: entities) {
- commitRequest.addMutations(makeUpsert(entity));
- }
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- // Only log the code and message for potentially-transient errors. The entire exception
- // will be propagated upon the last retry.
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
- }
- }
- totalWritten += entities.size();
- LOG.debug("Successfully wrote {} entities", entities.size());
- entities.clear();
- }
- }
-
- private static class DatastoreWriteResult implements Serializable {
- final long entitiesWritten;
-
- public DatastoreWriteResult(long recordsWritten) {
- this.entitiesWritten = recordsWritten;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
deleted file mode 100644
index f687739..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <p>Provides an API for reading from and writing to
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
- * versions of the Datastore Client libraries.
- */
-package org.apache.beam.sdk.io.datastore;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
deleted file mode 100644
index dd22289..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,617 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.datastore;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreReader;
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreSource;
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreWriter;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.collect.Lists;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.KindExpression;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.Value;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
- private static final String PROJECT_ID = "testProject";
- private static final String NAMESPACE = "testNamespace";
- private static final String KIND = "testKind";
- private static final Query QUERY;
- static {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(KIND);
- QUERY = q.build();
- }
- private V1Beta3.Read initialRead;
-
- @Mock
- Datastore mockDatastore;
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
- @Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreSource.class);
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- initialRead = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- }
-
- /**
- * Helper function to create a test {@code DataflowPipelineOptions}.
- */
- static final GcpOptions testPipelineOptions() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
- options.setGcpCredential(new TestCredential());
- return options;
- }
-
- @Test
- public void testBuildRead() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- /**
- * {@link #testBuildRead} but constructed in a different order.
- */
- @Test
- public void testBuildReadAlt() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- @Test
- public void testReadValidationFailsProject() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(QUERY);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("project");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQuery() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("query");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationSucceedsNamespace() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
- /* Should succeed, as a null namespace is fine. */
- read.validate(null);
- }
-
- @Test
- public void testReadDisplayData() {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID)
- .withQuery(QUERY)
- .withNamespace(NAMESPACE);
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
- assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testSourcePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
- "myProject").withQuery(Query.newBuilder().build());
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("DatastoreIO read should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- }
-
- @Test
- public void testWriteDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().write().withProjectId(null);
- }
-
- @Test
- public void testWriteValidationFailsWithNoProject() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- write.validate(null);
- }
-
- @Test
- public void testSinkValidationSucceedsWithProject() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- write.validate(null);
- }
-
- @Test
- public void testWriteDisplayData() {
- V1Beta3.Write write = DatastoreIO.v1beta3().write()
- .withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(write);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testSinkPrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1beta3().write().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- }
-
- @Test
- public void testQuerySplitBasic() throws Exception {
- KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
- Query query = Query.newBuilder().addKind(mykind).build();
-
- List<Query> mockSplits = new ArrayList<>();
- for (int i = 0; i < 8; ++i) {
- mockSplits.add(
- Query.newBuilder()
- .addKind(mykind)
- .setFilter(
- DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL,
- Value.newBuilder().setIntegerValue(i).build()))
- .build());
- }
-
- QuerySplitter splitter = mock(QuerySplitter.class);
- /* No namespace */
- PartitionId partition = PartitionId.newBuilder().build();
- when(splitter.getSplits(any(Query.class), eq(partition), eq(8), any(Datastore.class)))
- .thenReturn(mockSplits);
-
- DatastoreSource io = initialRead
- .withNamespace(null)
- .withQuery(query)
- .getSource()
- .withMockSplitter(splitter)
- .withMockEstimateSizeBytes(8 * 1024L);
-
- List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
- assertEquals(8, bundles.size());
- for (int i = 0; i < 8; ++i) {
- DatastoreSource bundle = bundles.get(i);
- Query bundleQuery = bundle.getQuery();
- assertEquals("mykind", bundleQuery.getKind(0).getName());
- assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue());
- }
- }
-
- /**
- * Verifies that when namespace is set in the source, the split request includes the namespace.
- */
- @Test
- public void testSourceWithNamespace() throws Exception {
- QuerySplitter splitter = mock(QuerySplitter.class);
- DatastoreSource io = initialRead
- .getSource()
- .withMockSplitter(splitter)
- .withMockEstimateSizeBytes(8 * 1024L);
-
- io.splitIntoBundles(1024, testPipelineOptions());
-
- PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
- verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
- verifyNoMoreInteractions(splitter);
- }
-
- @Test
- public void testQuerySplitWithZeroSize() throws Exception {
- KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
- Query query = Query.newBuilder().addKind(mykind).build();
-
- List<Query> mockSplits = Lists.newArrayList(
- Query.newBuilder()
- .addKind(mykind)
- .build());
-
- QuerySplitter splitter = mock(QuerySplitter.class);
- when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)))
- .thenReturn(mockSplits);
-
- DatastoreSource io = initialRead
- .withQuery(query)
- .getSource()
- .withMockSplitter(splitter)
- .withMockEstimateSizeBytes(0L);
-
- List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
- assertEquals(1, bundles.size());
- verify(splitter, never())
- .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
- DatastoreSource bundle = bundles.get(0);
- Query bundleQuery = bundle.getQuery();
- assertEquals("mykind", bundleQuery.getKind(0).getName());
- assertFalse(bundleQuery.hasFilter());
- }
-
- /**
- * Tests that a query with a user-provided limit field does not split, and does not even
- * interact with a query splitter.
- */
- @Test
- public void testQueryDoesNotSplitWithLimitSet() throws Exception {
- // Minimal query with a limit
- Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
-
- // Mock query splitter, should not be invoked.
- QuerySplitter splitter = mock(QuerySplitter.class);
- when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class)))
- .thenThrow(new AssertionError("Splitter should not be invoked"));
-
- List<DatastoreSource> bundles =
- initialRead
- .withQuery(query)
- .getSource()
- .withMockSplitter(splitter)
- .splitIntoBundles(1024, testPipelineOptions());
-
- assertEquals(1, bundles.size());
- assertEquals(query, bundles.get(0).getQuery());
- verifyNoMoreInteractions(splitter);
- }
-
- /**
- * Tests that when {@link QuerySplitter} cannot split a query, {@link V1Beta3} falls back to
- * a single split.
- */
- @Test
- public void testQuerySplitterThrows() throws Exception {
- // Mock query splitter that throws IllegalArgumentException
- IllegalArgumentException exception =
- new IllegalArgumentException("query not supported by splitter");
- QuerySplitter splitter = mock(QuerySplitter.class);
- when(
- splitter.getSplits(
- any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)))
- .thenThrow(exception);
-
- Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
- List<DatastoreSource> bundles =
- initialRead
- .withQuery(query)
- .getSource()
- .withMockSplitter(splitter)
- .withMockEstimateSizeBytes(10240L)
- .splitIntoBundles(1024, testPipelineOptions());
-
- assertEquals(1, bundles.size());
- assertEquals(query, bundles.get(0).getQuery());
- verify(splitter, times(1))
- .getSplits(
- any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class));
- logged.verifyWarn("Unable to parallelize the given query", exception);
- }
-
- @Test
- public void testQuerySplitSizeUnavailable() throws Exception {
- KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
- Query query = Query.newBuilder().addKind(mykind).build();
-
- List<Query> mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build());
-
- QuerySplitter splitter = mock(QuerySplitter.class);
- when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), any(Datastore.class)))
- .thenReturn(mockSplits);
-
- DatastoreSource io = initialRead
- .withQuery(query)
- .getSource()
- .withMockSplitter(splitter)
- .withMockEstimateSizeBytes(8 * 1024L);
-
- DatastoreSource spiedIo = spy(io);
- when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class)))
- .thenThrow(new NoSuchElementException());
-
- List<DatastoreSource> bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions());
- assertEquals(1, bundles.size());
- verify(splitter, never())
- .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
- DatastoreSource bundle = bundles.get(0);
- Query bundleQuery = bundle.getQuery();
- assertEquals("mykind", bundleQuery.getKind(0).getName());
- assertFalse(bundleQuery.hasFilter());
- }
-
- /**
- * Test building a Write using builder methods.
- */
- @Test
- public void testBuildWrite() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- assertEquals(PROJECT_ID, write.getProjectId());
- }
-
- /**
- * Test the detection of complete and incomplete keys.
- */
- @Test
- public void testHasNameOrId() {
- Key key;
- // Complete with name, no ancestor
- key = makeKey("bird", "finch").build();
- assertTrue(DatastoreWriter.isValidKey(key));
-
- // Complete with id, no ancestor
- key = makeKey("bird", 123).build();
- assertTrue(DatastoreWriter.isValidKey(key));
-
- // Incomplete, no ancestor
- key = makeKey("bird").build();
- assertFalse(DatastoreWriter.isValidKey(key));
-
- // Complete with name and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", "horned").build();
- assertTrue(DatastoreWriter.isValidKey(key));
-
- // Complete with id and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", 123).build();
- assertTrue(DatastoreWriter.isValidKey(key));
-
- // Incomplete with ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird").build();
- assertFalse(DatastoreWriter.isValidKey(key));
-
- key = makeKey().build();
- assertFalse(DatastoreWriter.isValidKey(key));
- }
-
- /**
- * Test that entities with incomplete keys cannot be updated.
- */
- @Test
- public void testAddEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
- writer.write(entity);
- }
-
- /**
- * Test that entities are added to the batch to update.
- */
- @Test
- public void testAddingEntities() throws Exception {
- List<Entity> expected = Lists.newArrayList(
- Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
- Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
- Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
-
- List<Entity> allEntities = Lists.newArrayList(expected);
- Collections.shuffle(allEntities);
-
- DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
- writer.open("test_id");
- for (Entity entity : allEntities) {
- writer.write(entity);
- }
-
- assertEquals(expected.size(), writer.entities.size());
- assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
- }
-
- /** Datastore batch API limit in number of records per query. */
- private static final int DATASTORE_QUERY_BATCH_LIMIT = 500;
-
- /**
- * A helper function that creates mock {@link Entity} results in response to a query. Always
- * indicates that more results are available, unless the batch is limited to fewer than
- * {@link #DATASTORE_QUERY_BATCH_LIMIT} results.
- */
- private static RunQueryResponse mockResponseForQuery(Query q) {
- // Every query V1Beta3 sends should have a limit.
- assertTrue(q.hasLimit());
-
- // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
- int limit = q.getLimit().getValue();
- assertThat(limit, greaterThanOrEqualTo(1));
- assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
-
- // Create the requested number of entities.
- List<EntityResult> entities = new ArrayList<>(limit);
- for (int i = 0; i < limit; ++i) {
- entities.add(
- EntityResult.newBuilder()
- .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
- .build());
- }
-
- // Fill out the other parameters on the returned result batch.
- RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
- ret.getBatchBuilder()
- .addAllEntityResults(entities)
- .setEntityResultType(EntityResult.ResultType.FULL)
- .setMoreResults(
- limit == DATASTORE_QUERY_BATCH_LIMIT
- ? QueryResultBatch.MoreResultsType.NOT_FINISHED
- : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
- return ret.build();
- }
-
- /** Helper function to run a test reading from a limited-result query. */
- private void runQueryLimitReadTest(int numEntities) throws Exception {
- // An empty query to read entities.
- Query query = Query.newBuilder().setLimit(
- Int32Value.newBuilder().setValue(numEntities)).build();
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject");
-
- // Use mockResponseForQuery to generate results.
- when(mockDatastore.runQuery(any(RunQueryRequest.class)))
- .thenAnswer(
- new Answer<RunQueryResponse>() {
- @Override
- public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable {
- Query q = ((RunQueryRequest) invocation.getArguments()[0]).getQuery();
- return mockResponseForQuery(q);
- }
- });
-
- // Actually instantiate the reader.
- DatastoreReader reader = new DatastoreReader(read.getSource(), mockDatastore);
-
- // Simply count the number of results returned by the reader.
- assertTrue(reader.start());
- int resultCount = 1;
- while (reader.advance()) {
- resultCount++;
- }
- reader.close();
-
- // Validate the number of results.
- assertEquals(numEntities, resultCount);
- }
-
- /** Tests reading with a query limit less than one batch. */
- @Test
- public void testReadingWithLimitOneBatch() throws Exception {
- runQueryLimitReadTest(5);
- }
-
- /** Tests reading with a query limit more than one batch, and not a multiple. */
- @Test
- public void testReadingWithLimitMultipleBatches() throws Exception {
- runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5);
- }
-
- /** Tests reading several batches, using an exact multiple of batch size results. */
- @Test
- public void testReadingWithLimitMultipleBatchesExactMultiple() throws Exception {
- runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 0a814c1..93165ff 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -103,6 +103,18 @@
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
+ <!-- Build dependencies -->
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-proto-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-protos</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
@@ -133,6 +145,16 @@
</dependency>
<dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.oauth-client</groupId>
+ <artifactId>google-oauth-client</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -189,6 +211,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
new file mode 100644
index 0000000..bde0aba
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * <p>{@link DatastoreIO} provides an API for reading from and writing to
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
+ * versions of the Datastore Client libraries.
+ *
+ * <p>To use the v1beta3 version see {@link V1Beta3}.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class DatastoreIO {
+
+ private DatastoreIO() {}
+
+ /**
+ * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
+ * of Datastore Client library.
+ */
+ public static V1Beta3 v1beta3() {
+ return new V1Beta3();
+ }
+}
[5/5] incubator-beam git commit: [BEAM-77] Move datastore into GCP IO
package
Posted by lc...@apache.org.
[BEAM-77] Move datastore into GCP IO package
This closes #584
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf874d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf874d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf874d42
Branch: refs/heads/master
Commit: cf874d426a937740a8ea14fd5ddbfddad7d9ecd3
Parents: 9d70025 c2ccd68
Author: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Authored: Mon Jul 11 10:07:17 2016 -0400
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:07:17 2016 -0400
----------------------------------------------------------------------
examples/java/pom.xml | 5 +
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/cookbook/DatastoreWordCount.java | 4 +-
pom.xml | 6 +
runners/core-java/pom.xml | 27 +-
sdks/java/core/pom.xml | 10 -
.../beam/sdk/io/datastore/DatastoreIO.java | 41 -
.../apache/beam/sdk/io/datastore/V1Beta3.java | 992 -------------------
.../beam/sdk/io/datastore/package-info.java | 24 -
.../beam/sdk/io/datastore/V1Beta3Test.java | 617 ------------
sdks/java/io/google-cloud-platform/pom.xml | 28 +
.../beam/sdk/io/gcp/datastore/DatastoreIO.java | 41 +
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 992 +++++++++++++++++++
.../beam/sdk/io/gcp/datastore/package-info.java | 24 +
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 617 ++++++++++++
15 files changed, 1722 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------
[4/5] incubator-beam git commit: Clean up duplicate dependencies in
runners core java pom
Posted by lc...@apache.org.
Clean up duplicate dependencies in runners core java pom
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cccbacf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cccbacf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cccbacf
Branch: refs/heads/master
Commit: 1cccbacf6ccc5f1942848ba812cdb3d3d5a2e6c5
Parents: 9d70025
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:09:13 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400
----------------------------------------------------------------------
runners/core-java/pom.xml | 27 ++++++---------------------
1 file changed, 6 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cccbacf/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index c7eea4a..fc8be0a 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -182,46 +182,31 @@
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
+ <!-- build dependencies -->
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <!-- build dependencies -->
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>annotations</artifactId>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
</dependency>
<dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<!-- test dependencies -->