You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/11 14:07:34 UTC

[1/5] incubator-beam git commit: Move Datastore from sdks/java/core to io/gcp

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9d7002545 -> cf874d426


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
new file mode 100644
index 0000000..0ba4433
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -0,0 +1,992 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Sink.WriteOperation;
+import org.apache.beam.sdk.io.Sink.Writer;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import com.google.datastore.v1beta3.CommitRequest;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreException;
+import com.google.datastore.v1beta3.client.DatastoreFactory;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.DatastoreOptions;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
+ * {@link Entity} objects.
+ *
+ * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
+ * must use the {@code gcloud} command line tool to get credentials for Datastore:
+ * <pre>
+ * $ gcloud auth login
+ * </pre>
+ *
+ * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
+ * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link V1Beta3.Read#withNamespace}.
+ *
+ * <p>For example:
+ *
+ * <pre> {@code
+ * // Read a query from Datastore
+ * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+ * Query query = ...;
+ * String projectId = "...";
+ *
+ * Pipeline p = Pipeline.create(options);
+ * PCollection<Entity> entities = p.apply(
+ *     DatastoreIO.v1beta3().read()
+ *         .withProjectId(projectId)
+ *         .withQuery(query));
+ * } </pre>
+ *
+ * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
+ * many workers. However, when the {@link Query} is configured with a limit using
+ * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
+ * all returned results will be read by a single Dataflow worker in order to ensure correct data.
+ *
+ * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
+ * specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
+ * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
+ * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
+ * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
+ *
+ * <pre>{@code
+ * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
+ * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ * }</pre>
+ *
+ * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
+ * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
+ * Keys</a> for more information about {@code Entity} keys.
+ *
+ * <p><h3>Permissions</h3>
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
+ * more details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
+ * </a>for security and permission related information specific to Datastore.
+ *
+ * @see org.apache.beam.sdk.runners.PipelineRunner
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class V1Beta3 {
+
+  // A package-private constructor to prevent direct instantiation from outside of this package
+  V1Beta3() {}
+
+  /**
+   * Datastore has a limit of 500 mutations per batch operation, so we flush
+   * changes to Datastore every 500 entities.
+   */
+  private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+
+  /**
+   * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
+   * {@code query}, and optionally {@code namespace} using {@link V1Beta3.Read#withProjectId},
+   * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}.
+   */
+  public V1Beta3.Read read() {
+    return new V1Beta3.Read(null, null, null);
+  }
+
+  /**
+   * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
+   * objects.
+   *
+   * @see DatastoreIO
+   */
+  public static class Read extends PTransform<PBegin, PCollection<Entity>> {
+    @Nullable
+    private final String projectId;
+
+    @Nullable
+    private final Query query;
+
+    @Nullable
+    private final String namespace;
+
+    /**
+     * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
+     * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
+     * an error will be thrown.
+     */
+    private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace) {
+      this.projectId = projectId;
+      this.query = query;
+      this.namespace = namespace;
+    }
+
+    /**
+     * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
+     */
+    public V1Beta3.Read withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new V1Beta3.Read(projectId, query, namespace);
+    }
+
+    /**
+     * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
+     *
+     * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
+     * across many workers. However, when the {@link Query} is configured with a limit using
+     * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
+     * to ensure correct results.
+     */
+    public V1Beta3.Read withQuery(Query query) {
+      checkNotNull(query, "query");
+      checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+          "Invalid query limit %s: must be positive", query.getLimit().getValue());
+      return new V1Beta3.Read(projectId, query, namespace);
+    }
+
+    /**
+     * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
+     */
+    public V1Beta3.Read withNamespace(String namespace) {
+      return new V1Beta3.Read(projectId, query, namespace);
+    }
+
+    @Nullable
+    public Query getQuery() {
+      return query;
+    }
+
+    @Nullable
+    public String getProjectId() {
+      return projectId;
+    }
+
+    @Nullable
+    public String getNamespace() {
+      return namespace;
+    }
+
+    @Override
+    public PCollection<Entity> apply(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(projectId, "projectId");
+      checkNotNull(query, "query");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("ProjectId"))
+          .addIfNotNull(DisplayData.item("namespace", namespace)
+              .withLabel("Namespace"))
+          .addIfNotNull(DisplayData.item("query", query.toString())
+              .withLabel("Query"));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .add("query", query)
+          .add("namespace", namespace)
+          .toString();
+    }
+
+    @VisibleForTesting
+    DatastoreSource getSource() {
+      return new DatastoreSource(projectId, query, namespace);
+    }
+  }
+
+  /**
+   * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
+   * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
+   */
+  public Write write() {
+    return new Write(null);
+  }
+
+  /**
+   * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class Write extends PTransform<PCollection<Entity>, PDone> {
+    @Nullable
+    private final String projectId;
+
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    public Write(@Nullable String projectId) {
+      this.projectId = projectId;
+    }
+
+    /**
+     * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
+     */
+    public Write withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new Write(projectId);
+    }
+
+    @Override
+    public PDone apply(PCollection<Entity> input) {
+      return input.apply(
+          org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
+    }
+
+    @Override
+    public void validate(PCollection<Entity> input) {
+      checkNotNull(projectId, "projectId");
+    }
+
+    @Nullable
+    public String getProjectId() {
+      return projectId;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .toString();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"));
+    }
+  }
+
+  /**
+   * A {@link org.apache.beam.sdk.io.Source} that reads data from Datastore.
+   */
+  static class DatastoreSource extends BoundedSource<Entity> {
+
+    @Override
+    public Coder<Entity> getDefaultOutputCoder() {
+      return ProtoCoder.of(Entity.class);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) {
+      return false;
+    }
+
+    @Override
+    public List<DatastoreSource> splitIntoBundles(long desiredBundleSizeBytes,
+        PipelineOptions options) throws Exception {
+      // Users may request a limit on the number of results. We can currently support this by
+      // simply disabling parallel reads and using only a single split.
+      if (query.hasLimit()) {
+        return ImmutableList.of(this);
+      }
+
+      long numSplits;
+      try {
+        numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
+      } catch (Exception e) {
+        // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.
+        numSplits = 12;
+      }
+
+      // If the desiredBundleSize or number of workers results in 1 split, simply return
+      // a source that reads from the original query.
+      if (numSplits <= 1) {
+        return ImmutableList.of(this);
+      }
+
+      List<Query> datastoreSplits;
+      try {
+        datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
+      } catch (IllegalArgumentException | DatastoreException e) {
+        LOG.warn("Unable to parallelize the given query: {}", query, e);
+        return ImmutableList.of(this);
+      }
+
+      ImmutableList.Builder<DatastoreSource> splits = ImmutableList.builder();
+      for (Query splitQuery : datastoreSplits) {
+        splits.add(new DatastoreSource(projectId, splitQuery, namespace));
+      }
+      return splits.build();
+    }
+
+    @Override
+    public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
+      return new DatastoreReader(this, getDatastore(pipelineOptions));
+    }
+
+    @Override
+    public void validate() {
+      checkNotNull(query, "query");
+      checkNotNull(projectId, "projectId");
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      // Datastore provides no way to get a good estimate of how large the result of a query
+      // will be. As a rough approximation, we attempt to fetch the statistics of the whole
+      // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
+      // is specified in the query.
+      //
+      // See https://cloud.google.com/datastore/docs/concepts/stats
+      if (mockEstimateSizeBytes != null) {
+        return mockEstimateSizeBytes;
+      }
+
+      Datastore datastore = getDatastore(options);
+      if (query.getKindCount() != 1) {
+        throw new UnsupportedOperationException(
+            "Can only estimate size for queries specifying exactly 1 kind.");
+      }
+      String ourKind = query.getKind(0).getName();
+      long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
+      Query.Builder query = Query.newBuilder();
+      if (namespace == null) {
+        query.addKindBuilder().setName("__Stat_Kind__");
+      } else {
+        query.addKindBuilder().setName("__Ns_Stat_Kind__");
+      }
+      query.setFilter(makeAndFilter(
+          makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
+          makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
+      RunQueryRequest request = makeRequest(query.build());
+
+      long now = System.currentTimeMillis();
+      RunQueryResponse response = datastore.runQuery(request);
+      LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
+
+      QueryResultBatch batch = response.getBatch();
+      if (batch.getEntityResultsCount() == 0) {
+        throw new NoSuchElementException(
+            "Datastore statistics for kind " + ourKind + " unavailable");
+      }
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("entity_bytes").getIntegerValue();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("ProjectId"))
+          .addIfNotNull(DisplayData.item("namespace", namespace)
+              .withLabel("Namespace"))
+          .addIfNotNull(DisplayData.item("query", query.toString())
+              .withLabel("Query"));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .add("query", query)
+          .add("namespace", namespace)
+          .toString();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
+    private final String projectId;
+    private final Query query;
+    @Nullable
+    private final String namespace;
+
+    /** For testing only. TODO: This could be much cleaner with dependency injection. */
+    @Nullable
+    private QuerySplitter mockSplitter;
+    @Nullable
+    private Long mockEstimateSizeBytes;
+
+    DatastoreSource(String projectId, Query query, @Nullable String namespace) {
+      this.projectId = projectId;
+      this.query = query;
+      this.namespace = namespace;
+    }
+
+    /**
+     * A helper function to get the split queries, taking into account the optional
+     * {@code namespace} and whether there is a mock splitter.
+     */
+    private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
+        throws DatastoreException {
+      // If namespace is set, include it in the split request so splits are calculated accordingly.
+      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+      if (namespace != null) {
+        partitionBuilder.setNamespaceId(namespace);
+      }
+
+      if (mockSplitter != null) {
+        // For testing.
+        return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null);
+      }
+
+      return DatastoreHelper.getQuerySplitter().getSplits(
+          query, partitionBuilder.build(), numSplits, getDatastore(options));
+    }
+
+    /**
+     * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this
+     * {@code DatastoreSource}. For example, sets the {@code namespace} for the request.
+     */
+    private RunQueryRequest makeRequest(Query query) {
+      RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+      if (namespace != null) {
+        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+      }
+      return requestBuilder.build();
+    }
+
+    /**
+     * Datastore system tables with statistics are periodically updated. This method fetches
+     * the latest timestamp of statistics update using the {@code __Stat_Total__} table.
+     */
+    private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
+      Query.Builder query = Query.newBuilder();
+      query.addKindBuilder().setName("__Stat_Total__");
+      query.addOrder(makeOrder("timestamp", DESCENDING));
+      query.setLimit(Int32Value.newBuilder().setValue(1));
+      RunQueryRequest request = makeRequest(query.build());
+
+      long now = System.currentTimeMillis();
+      RunQueryResponse response = datastore.runQuery(request);
+      LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
+          System.currentTimeMillis() - now);
+      QueryResultBatch batch = response.getBatch();
+      if (batch.getEntityResultsCount() == 0) {
+        throw new NoSuchElementException(
+            "Datastore total statistics for project " + projectId + " unavailable");
+      }
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
+    }
+
+    private Datastore getDatastore(PipelineOptions pipelineOptions) {
+      DatastoreOptions.Builder builder =
+          new DatastoreOptions.Builder()
+              .projectId(projectId)
+              .initializer(
+                  new RetryHttpRequestInitializer()
+              );
+
+      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+      if (credential != null) {
+        builder.credential(credential);
+      }
+      return DatastoreFactory.get().create(builder.build());
+    }
+
+    /** For testing only. */
+    DatastoreSource withMockSplitter(QuerySplitter splitter) {
+      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
+      res.mockSplitter = splitter;
+      res.mockEstimateSizeBytes = mockEstimateSizeBytes;
+      return res;
+    }
+
+    /** For testing only. */
+    DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) {
+      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
+      res.mockSplitter = mockSplitter;
+      res.mockEstimateSizeBytes = estimateSizeBytes;
+      return res;
+    }
+
+    @VisibleForTesting
+    Query getQuery() {
+      return query;
+    }
+  }
+
+  /**
+   * A {@link DatastoreSource.Reader} over the records from a query of the datastore.
+   *
+   * <p>Timestamped records are currently not supported.
+   * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
+   */
+  @VisibleForTesting
+  static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
+    private final DatastoreSource source;
+
+    /**
+     * Datastore to read from.
+     */
+    private final Datastore datastore;
+
+    /**
+     * True if more results may be available.
+     */
+    private boolean moreResults;
+
+    /**
+     * Iterator over records.
+     */
+    private java.util.Iterator<EntityResult> entities;
+
+    /**
+     * Current batch of query results.
+     */
+    private QueryResultBatch currentBatch;
+
+    /**
+     * Maximum number of results to request per query.
+     *
+     * <p>Must be set, or it may result in an I/O error when querying
+     * Cloud Datastore.
+     */
+    private static final int QUERY_BATCH_LIMIT = 500;
+
+    /**
+     * Remaining user-requested limit on the number of sources to return. If the user did not set a
+     * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never
+     * be decremented.
+     */
+    private int userLimit;
+
+    private volatile boolean done = false;
+
+    private Entity currentEntity;
+
+    /**
+     * Returns a DatastoreReader with DatastoreSource and Datastore object set.
+     *
+     * @param datastore a datastore connection to use.
+     */
+    public DatastoreReader(DatastoreSource source, Datastore datastore) {
+      this.source = source;
+      this.datastore = datastore;
+      // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
+      userLimit = source.query.hasLimit()
+          ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
+    }
+
+    @Override
+    public Entity getCurrent() {
+      return currentEntity;
+    }
+
+    @Override
+    public final long getSplitPointsConsumed() {
+      return done ? 1 : 0;
+    }
+
+    @Override
+    public final long getSplitPointsRemaining() {
+      return done ? 0 : 1;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (entities == null || (!entities.hasNext() && moreResults)) {
+        try {
+          entities = getIteratorAndMoveCursor();
+        } catch (DatastoreException e) {
+          throw new IOException(e);
+        }
+      }
+
+      if (entities == null || !entities.hasNext()) {
+        currentEntity = null;
+        done = true;
+        return false;
+      }
+
+      currentEntity = entities.next().getEntity();
+      return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing
+    }
+
+    @Override
+    public DatastoreSource getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public DatastoreSource splitAtFraction(double fraction) {
+      // Not supported.
+      return null;
+    }
+
+    @Override
+    public Double getFractionConsumed() {
+      // Not supported.
+      return null;
+    }
+
+    /**
+     * Returns an iterator over the next batch of records for the query
+     * and updates the cursor to get the next batch as needed.
+     * Query has specified limit and offset from InputSplit.
+     */
+    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+      Query.Builder query = source.query.toBuilder().clone();
+      query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
+      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+        query.setStartCursor(currentBatch.getEndCursor());
+      }
+
+      RunQueryRequest request = source.makeRequest(query.build());
+      RunQueryResponse response = datastore.runQuery(request);
+
+      currentBatch = response.getBatch();
+
+      // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
+      // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
+      // use result count to determine if more results might exist.
+      int numFetch = currentBatch.getEntityResultsCount();
+      if (source.query.hasLimit()) {
+        verify(userLimit >= numFetch,
+            "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
+            userLimit, numFetch, query.getLimit());
+        userLimit -= numFetch;
+      }
+      moreResults =
+          // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied.
+          (userLimit > 0)
+              // All indications from the API are that there are/may be more results.
+              && ((numFetch == QUERY_BATCH_LIMIT)
+              || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+      // May receive a batch of 0 results if the number of records is a multiple
+      // of the request limit.
+      if (numFetch == 0) {
+        return null;
+      }
+
+      return currentBatch.getEntityResultsList().iterator();
+    }
+  }
+
+  /**
+   * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
+   */
+  static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
+    final String projectId;
+
+    public DatastoreSink(String projectId) {
+      this.projectId = projectId;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      checkNotNull(projectId, "projectId");
+    }
+
+    @Override
+    public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
+      return new DatastoreWriteOperation(this);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"));
+    }
+  }
+
+  /**
+   * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
+   */
+  private static class DatastoreWriteOperation
+      extends WriteOperation<Entity, DatastoreWriteResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
+
+    private final DatastoreSink sink;
+
+    public DatastoreWriteOperation(DatastoreSink sink) {
+      this.sink = sink;
+    }
+
+    @Override
+    public Coder<DatastoreWriteResult> getWriterResultCoder() {
+      return SerializableCoder.of(DatastoreWriteResult.class);
+    }
+
+    @Override
+    public void initialize(PipelineOptions options) throws Exception {}
+
+    /**
+     * Finalizes the write.  Logs the number of entities written to the Datastore.
+     */
+    @Override
+    public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
+        throws Exception {
+      long totalEntities = 0;
+      for (DatastoreWriteResult result : writerResults) {
+        totalEntities += result.entitiesWritten;
+      }
+      LOG.info("Wrote {} elements.", totalEntities);
+    }
+
+    @Override
+    public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
+      DatastoreOptions.Builder builder =
+          new DatastoreOptions.Builder()
+              .projectId(sink.projectId)
+              .initializer(new RetryHttpRequestInitializer());
+      Credential credential = options.as(GcpOptions.class).getGcpCredential();
+      if (credential != null) {
+        builder.credential(credential);
+      }
+      Datastore datastore = DatastoreFactory.get().create(builder.build());
+
+      return new DatastoreWriter(this, datastore);
+    }
+
+    @Override
+    public DatastoreSink getSink() {
+      return sink;
+    }
+  }
+
+  /**
+   * {@link Writer} that writes entities to a Datastore Sink.  Entities are written in batches,
+   * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.  Entities
+   * are committed as upsert mutations (either update if the key already exists, or insert if it is
+   * a new key).  If an entity does not have a complete key (i.e., it has no name or id), the bundle
+   * will fail.
+   *
+   * <p>See <a
+   * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
+   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
+   * mutations.
+   *
+   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
+   * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+   * times).
+   *
+   * <p>Visible for testing purposes.
+   */
+  @VisibleForTesting
+  static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
+    private final DatastoreWriteOperation writeOp;
+    private final Datastore datastore;
+    private long totalWritten = 0;
+
+    // Visible for testing.
+    final List<Entity> entities = new ArrayList<>();
+
+    /**
+     * Since a bundle is written in batches, we should retry the commit of a batch in order to
+     * prevent transient errors from causing the bundle to fail.
+     */
+    private static final int MAX_RETRIES = 5;
+
+    /**
+     * Initial backoff time for exponential backoff for retry attempts.
+     */
+    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+    /**
+     * Returns true if a Datastore key is complete.  A key is complete if its last element
+     * has either an id or a name.
+     */
+    static boolean isValidKey(Key key) {
+      List<PathElement> elementList = key.getPathList();
+      if (elementList.isEmpty()) {
+        return false;
+      }
+      PathElement lastElement = elementList.get(elementList.size() - 1);
+      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+    }
+
+    DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
+      this.writeOp = writeOp;
+      this.datastore = datastore;
+    }
+
+    @Override
+    public void open(String uId) throws Exception {}
+
+    /**
+     * Writes an entity to the Datastore.  Writes are batched, up to {@link
+     * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
+     * {@link IllegalArgumentException} will be thrown.
+     */
+    @Override
+    public void write(Entity value) throws Exception {
+      // Verify that the entity to write has a complete key.
+      if (!isValidKey(value.getKey())) {
+        throw new IllegalArgumentException(
+            "Entities to be written to the Datastore must have complete keys");
+      }
+
+      entities.add(value);
+
+      if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+        flushBatch();
+      }
+    }
+
+    /**
+     * Flushes any pending batch writes and returns a DatastoreWriteResult.
+     */
+    @Override
+    public DatastoreWriteResult close() throws Exception {
+      if (entities.size() > 0) {
+        flushBatch();
+      }
+      return new DatastoreWriteResult(totalWritten);
+    }
+
+    @Override
+    public DatastoreWriteOperation getWriteOperation() {
+      return writeOp;
+    }
+
+    /**
+     * Writes a batch of entities to the Datastore.
+     *
+     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
+     * times).  All entities in the batch will be committed again, even if the commit was partially
+     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+     * thrown.
+     *
+     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+     * backing off between retries fails.
+     */
+    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+      LOG.debug("Writing batch of {} entities", entities.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+      while (true) {
+        // Batch upsert entities.
+        try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          for (Entity entity: entities) {
+            commitRequest.addMutations(makeUpsert(entity));
+          }
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+          datastore.commit(commitRequest.build());
+          // Break if the commit threw no exception.
+          break;
+        } catch (DatastoreException exception) {
+          // Only log the code and message for potentially-transient errors. The entire exception
+          // will be propagated upon the last retry.
+          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      totalWritten += entities.size();
+      LOG.debug("Successfully wrote {} entities", entities.size());
+      entities.clear();
+    }
+  }
+
+  private static class DatastoreWriteResult implements Serializable {
+    final long entitiesWritten;
+
+    public DatastoreWriteResult(long recordsWritten) {
+      this.entitiesWritten = recordsWritten;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
new file mode 100644
index 0000000..1ca0266
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Provides an API for reading from and writing to
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
+ * versions of the Datastore Client libraries.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
new file mode 100644
index 0000000..a60e7c5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreReader;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreSource;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.KindExpression;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Tests for {@link V1Beta3}.
+ */
+@RunWith(JUnit4.class)
+public class V1Beta3Test {
+  private static final String PROJECT_ID = "testProject";
+  private static final String NAMESPACE = "testNamespace";
+  private static final String KIND = "testKind";
+  private static final Query QUERY;
+  static {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(KIND);
+    QUERY = q.build();
+  }
+  private V1Beta3.Read initialRead;
+
+  @Mock
+  Datastore mockDatastore;
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  @Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreSource.class);
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    initialRead = DatastoreIO.v1beta3().read()
+        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  }
+
+  /**
+   * Helper function to create a test {@code DataflowPipelineOptions}.
+   */
+  static final GcpOptions testPipelineOptions() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setGcpCredential(new TestCredential());
+    return options;
+  }
+
+  @Test
+  public void testBuildRead() throws Exception {
+    V1Beta3.Read read = DatastoreIO.v1beta3().read()
+        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+    assertEquals(QUERY, read.getQuery());
+    assertEquals(PROJECT_ID, read.getProjectId());
+    assertEquals(NAMESPACE, read.getNamespace());
+  }
+
+  /**
+   * {@link #testBuildRead} but constructed in a different order.
+   */
+  @Test
+  public void testBuildReadAlt() throws Exception {
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
+        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
+    assertEquals(QUERY, read.getQuery());
+    assertEquals(PROJECT_ID, read.getProjectId());
+    assertEquals(NAMESPACE, read.getNamespace());
+  }
+
+  @Test
+  public void testReadValidationFailsProject() throws Exception {
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(QUERY);
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("project");
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadValidationFailsQuery() throws Exception {
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("query");
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadValidationFailsQueryLimitZero() throws Exception {
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid query limit 0: must be positive");
+
+    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
+  }
+
+  @Test
+  public void testReadValidationFailsQueryLimitNegative() throws Exception {
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid query limit -5: must be positive");
+
+    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
+  }
+
+  @Test
+  public void testReadValidationSucceedsNamespace() throws Exception {
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
+    /* Should succeed, as a null namespace is fine. */
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadDisplayData() {
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
+      .withProjectId(PROJECT_ID)
+      .withQuery(QUERY)
+      .withNamespace(NAMESPACE);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testSourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
+        "myProject").withQuery(Query.newBuilder().build());
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+    assertThat("DatastoreIO read should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
+  public void testWriteDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1beta3().write().withProjectId(null);
+  }
+
+  @Test
+  public void testWriteValidationFailsWithNoProject() throws Exception {
+    V1Beta3.Write write =  DatastoreIO.v1beta3().write();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    write.validate(null);
+  }
+
+  @Test
+  public void testSinkValidationSucceedsWithProject() throws Exception {
+    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+    write.validate(null);
+  }
+
+  @Test
+  public void testWriteDisplayData() {
+    V1Beta3.Write write =  DatastoreIO.v1beta3().write()
+        .withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testSinkPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Entity>, ?> write =
+        DatastoreIO.v1beta3().write().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
+  public void testQuerySplitBasic() throws Exception {
+    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+    Query query = Query.newBuilder().addKind(mykind).build();
+
+    List<Query> mockSplits = new ArrayList<>();
+    for (int i = 0; i < 8; ++i) {
+      mockSplits.add(
+          Query.newBuilder()
+              .addKind(mykind)
+              .setFilter(
+                  DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL,
+                      Value.newBuilder().setIntegerValue(i).build()))
+              .build());
+    }
+
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    /* No namespace */
+    PartitionId partition = PartitionId.newBuilder().build();
+    when(splitter.getSplits(any(Query.class), eq(partition), eq(8), any(Datastore.class)))
+        .thenReturn(mockSplits);
+
+    DatastoreSource io = initialRead
+        .withNamespace(null)
+        .withQuery(query)
+        .getSource()
+        .withMockSplitter(splitter)
+        .withMockEstimateSizeBytes(8 * 1024L);
+
+    List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
+    assertEquals(8, bundles.size());
+    for (int i = 0; i < 8; ++i) {
+      DatastoreSource bundle = bundles.get(i);
+      Query bundleQuery = bundle.getQuery();
+      assertEquals("mykind", bundleQuery.getKind(0).getName());
+      assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue());
+    }
+  }
+
+  /**
+   * Verifies that when namespace is set in the source, the split request includes the namespace.
+   */
+  @Test
+  public void testSourceWithNamespace() throws Exception {
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    DatastoreSource io = initialRead
+        .getSource()
+        .withMockSplitter(splitter)
+        .withMockEstimateSizeBytes(8 * 1024L);
+
+    io.splitIntoBundles(1024, testPipelineOptions());
+
+    PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
+    verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
+    verifyNoMoreInteractions(splitter);
+  }
+
+  @Test
+  public void testQuerySplitWithZeroSize() throws Exception {
+    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+    Query query = Query.newBuilder().addKind(mykind).build();
+
+    List<Query> mockSplits = Lists.newArrayList(
+        Query.newBuilder()
+            .addKind(mykind)
+            .build());
+
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)))
+        .thenReturn(mockSplits);
+
+    DatastoreSource io = initialRead
+        .withQuery(query)
+        .getSource()
+        .withMockSplitter(splitter)
+        .withMockEstimateSizeBytes(0L);
+
+    List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
+    assertEquals(1, bundles.size());
+    verify(splitter, never())
+        .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
+    DatastoreSource bundle = bundles.get(0);
+    Query bundleQuery = bundle.getQuery();
+    assertEquals("mykind", bundleQuery.getKind(0).getName());
+    assertFalse(bundleQuery.hasFilter());
+  }
+
+  /**
+   * Tests that a query with a user-provided limit field does not split, and does not even
+   * interact with a query splitter.
+   */
+  @Test
+  public void testQueryDoesNotSplitWithLimitSet() throws Exception {
+    // Minimal query with a limit
+    Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
+
+    // Mock query splitter, should not be invoked.
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class)))
+        .thenThrow(new AssertionError("Splitter should not be invoked"));
+
+    List<DatastoreSource> bundles =
+        initialRead
+            .withQuery(query)
+            .getSource()
+            .withMockSplitter(splitter)
+            .splitIntoBundles(1024, testPipelineOptions());
+
+    assertEquals(1, bundles.size());
+    assertEquals(query, bundles.get(0).getQuery());
+    verifyNoMoreInteractions(splitter);
+  }
+
+  /**
+   * Tests that when {@link QuerySplitter} cannot split a query, {@link V1Beta3} falls back to
+   * a single split.
+   */
+  @Test
+  public void testQuerySplitterThrows() throws Exception {
+    // Mock query splitter that throws IllegalArgumentException
+    IllegalArgumentException exception =
+        new IllegalArgumentException("query not supported by splitter");
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    when(
+            splitter.getSplits(
+                any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)))
+        .thenThrow(exception);
+
+    Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
+    List<DatastoreSource> bundles =
+        initialRead
+            .withQuery(query)
+            .getSource()
+            .withMockSplitter(splitter)
+            .withMockEstimateSizeBytes(10240L)
+            .splitIntoBundles(1024, testPipelineOptions());
+
+    assertEquals(1, bundles.size());
+    assertEquals(query, bundles.get(0).getQuery());
+    verify(splitter, times(1))
+        .getSplits(
+            any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class));
+    logged.verifyWarn("Unable to parallelize the given query", exception);
+  }
+
+  @Test
+  public void testQuerySplitSizeUnavailable() throws Exception {
+    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
+    Query query = Query.newBuilder().addKind(mykind).build();
+
+    List<Query> mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build());
+
+    QuerySplitter splitter = mock(QuerySplitter.class);
+    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), any(Datastore.class)))
+        .thenReturn(mockSplits);
+
+    DatastoreSource io = initialRead
+        .withQuery(query)
+        .getSource()
+        .withMockSplitter(splitter)
+        .withMockEstimateSizeBytes(8 * 1024L);
+
+    DatastoreSource spiedIo = spy(io);
+    when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class)))
+        .thenThrow(new NoSuchElementException());
+
+    List<DatastoreSource> bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions());
+    assertEquals(1, bundles.size());
+    verify(splitter, never())
+        .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
+    DatastoreSource bundle = bundles.get(0);
+    Query bundleQuery = bundle.getQuery();
+    assertEquals("mykind", bundleQuery.getKind(0).getName());
+    assertFalse(bundleQuery.hasFilter());
+  }
+
+  /**
+   * Test building a Write using builder methods.
+   */
+  @Test
+  public void testBuildWrite() throws Exception {
+    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+    assertEquals(PROJECT_ID, write.getProjectId());
+  }
+
+  /**
+   * Test the detection of complete and incomplete keys.
+   */
+  @Test
+  public void testHasNameOrId() {
+    Key key;
+    // Complete with name, no ancestor
+    key = makeKey("bird", "finch").build();
+    assertTrue(DatastoreWriter.isValidKey(key));
+
+    // Complete with id, no ancestor
+    key = makeKey("bird", 123).build();
+    assertTrue(DatastoreWriter.isValidKey(key));
+
+    // Incomplete, no ancestor
+    key = makeKey("bird").build();
+    assertFalse(DatastoreWriter.isValidKey(key));
+
+    // Complete with name and ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", "horned").build();
+    assertTrue(DatastoreWriter.isValidKey(key));
+
+    // Complete with id and ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", 123).build();
+    assertTrue(DatastoreWriter.isValidKey(key));
+
+    // Incomplete with ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird").build();
+    assertFalse(DatastoreWriter.isValidKey(key));
+
+    key = makeKey().build();
+    assertFalse(DatastoreWriter.isValidKey(key));
+  }
+
+  /**
+   * Test that entities with incomplete keys cannot be updated.
+   */
+  @Test
+  public void testAddEntitiesWithIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
+
+    writer.write(entity);
+  }
+
+  /**
+   * Test that entities are added to the batch to update.
+   */
+  @Test
+  public void testAddingEntities() throws Exception {
+    List<Entity> expected = Lists.newArrayList(
+        Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
+        Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
+        Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
+
+    List<Entity> allEntities = Lists.newArrayList(expected);
+    Collections.shuffle(allEntities);
+
+    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+    writer.open("test_id");
+    for (Entity entity : allEntities) {
+      writer.write(entity);
+    }
+
+    assertEquals(expected.size(), writer.entities.size());
+    assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
+  }
+
+  /** Datastore batch API limit in number of records per query. */
+  private static final int DATASTORE_QUERY_BATCH_LIMIT = 500;
+
+  /**
+   * A helper function that creates mock {@link Entity} results in response to a query. Always
+   * indicates that more results are available, unless the batch is limited to fewer than
+   * {@link #DATASTORE_QUERY_BATCH_LIMIT} results.
+   */
+  private static RunQueryResponse mockResponseForQuery(Query q) {
+    // Every query V1Beta3 sends should have a limit.
+    assertTrue(q.hasLimit());
+
+    // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
+    int limit = q.getLimit().getValue();
+    assertThat(limit, greaterThanOrEqualTo(1));
+    assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
+
+    // Create the requested number of entities.
+    List<EntityResult> entities = new ArrayList<>(limit);
+    for (int i = 0; i < limit; ++i) {
+      entities.add(
+          EntityResult.newBuilder()
+              .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
+              .build());
+    }
+
+    // Fill out the other parameters on the returned result batch.
+    RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
+    ret.getBatchBuilder()
+        .addAllEntityResults(entities)
+        .setEntityResultType(EntityResult.ResultType.FULL)
+        .setMoreResults(
+            limit == DATASTORE_QUERY_BATCH_LIMIT
+                ? QueryResultBatch.MoreResultsType.NOT_FINISHED
+                : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
+
+    return ret.build();
+  }
+
+  /** Helper function to run a test reading from a limited-result query. */
+  private void runQueryLimitReadTest(int numEntities) throws Exception {
+    // An empty query to read entities.
+    Query query = Query.newBuilder().setLimit(
+        Int32Value.newBuilder().setValue(numEntities)).build();
+    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject");
+
+    // Use mockResponseForQuery to generate results.
+    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+        .thenAnswer(
+            new Answer<RunQueryResponse>() {
+              @Override
+              public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable {
+                Query q = ((RunQueryRequest) invocation.getArguments()[0]).getQuery();
+                return mockResponseForQuery(q);
+              }
+            });
+
+    // Actually instantiate the reader.
+    DatastoreReader reader = new DatastoreReader(read.getSource(), mockDatastore);
+
+    // Simply count the number of results returned by the reader.
+    assertTrue(reader.start());
+    int resultCount = 1;
+    while (reader.advance()) {
+      resultCount++;
+    }
+    reader.close();
+
+    // Validate the number of results.
+    assertEquals(numEntities, resultCount);
+  }
+
+  /** Tests reading with a query limit less than one batch. */
+  @Test
+  public void testReadingWithLimitOneBatch() throws Exception {
+    runQueryLimitReadTest(5);
+  }
+
+  /** Tests reading with a query limit more than one batch, and not a multiple. */
+  @Test
+  public void testReadingWithLimitMultipleBatches() throws Exception {
+    runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5);
+  }
+
+  /** Tests reading several batches, using an exact multiple of batch size results. */
+  @Test
+  public void testReadingWithLimitMultipleBatchesExactMultiple() throws Exception {
+    runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT);
+  }
+}


[3/5] incubator-beam git commit: Add io-gcp to top-level pom dependency management

Posted by lc...@apache.org.
Add io-gcp to top-level pom dependency management

So its version is inherited everywhere that depends on it


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15dff04d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15dff04d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15dff04d

Branch: refs/heads/master
Commit: 15dff04d9fd73b5443b2213ae1dfe8b998a87c74
Parents: 1cccbac
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:10:10 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400

----------------------------------------------------------------------
 pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15dff04d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 72d73fb..7089c2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -271,6 +271,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-core-java</artifactId>
         <version>${project.version}</version>
       </dependency>


[2/5] incubator-beam git commit: Move Datastore from sdks/java/core to io/gcp

Posted by lc...@apache.org.
Move Datastore from sdks/java/core to io/gcp

* Remove DataflowDatastoreIOTest. Dataflow runner translation of display
  data should be tested via code in the same module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2ccd685
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2ccd685
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2ccd685

Branch: refs/heads/master
Commit: c2ccd685d82aff06ea4a41e84841e653513c4050
Parents: 15dff04
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:10:34 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400

----------------------------------------------------------------------
 examples/java/pom.xml                           |   5 +
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   |   4 +-
 sdks/java/core/pom.xml                          |  10 -
 .../beam/sdk/io/datastore/DatastoreIO.java      |  41 -
 .../apache/beam/sdk/io/datastore/V1Beta3.java   | 992 -------------------
 .../beam/sdk/io/datastore/package-info.java     |  24 -
 .../beam/sdk/io/datastore/V1Beta3Test.java      | 617 ------------
 sdks/java/io/google-cloud-platform/pom.xml      |  28 +
 .../beam/sdk/io/gcp/datastore/DatastoreIO.java  |  41 +
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 992 +++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/package-info.java |  24 +
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  | 617 ++++++++++++
 13 files changed, 1710 insertions(+), 1687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 0d2f505..ca16f51 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -204,6 +204,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 708aa87..e6cc0cc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 36af202..847523b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -25,8 +25,8 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 import org.apache.beam.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.datastore.V1Beta3;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 9ec8f3d..bda77cb 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -383,16 +383,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-proto-client</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-protos</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>gcsio</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
deleted file mode 100644
index d5043f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/DatastoreIO.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.datastore;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * <p>{@link DatastoreIO} provides an API for reading from and writing to
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
- * versions of the Datastore Client libraries.
- *
- * <p>To use the v1beta3 version see {@link V1Beta3}.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class DatastoreIO {
-
-  private DatastoreIO() {}
-
-  /**
-   * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
-   * of Datastore Client library.
-   */
-  public static V1Beta3 v1beta3() {
-    return new V1Beta3();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
deleted file mode 100644
index 0b9f709..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/V1Beta3.java
+++ /dev/null
@@ -1,992 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Ints;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
- * {@link Entity} objects.
- *
- * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
- * must use the {@code gcloud} command line tool to get credentials for Datastore:
- * <pre>
- * $ gcloud auth login
- * </pre>
- *
- * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
- * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
- * specify the project to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link V1Beta3.Read#withNamespace}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * // Read a query from Datastore
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String projectId = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(
- *     DatastoreIO.v1beta3().read()
- *         .withProjectId(projectId)
- *         .withQuery(query));
- * } </pre>
- *
- * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
- * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct data.
- *
- * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
- * specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
- * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
- * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
- * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
- *
- * <pre>{@code
- * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
- * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- * }</pre>
- *
- * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
- * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
- * Keys</a> for more information about {@code Entity} keys.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
- * </a>for security and permission related information specific to Datastore.
- *
- * @see org.apache.beam.sdk.runners.PipelineRunner
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class V1Beta3 {
-
-  // A package-private constructor to prevent direct instantiation from outside of this package
-  V1Beta3() {}
-
-  /**
-   * Datastore has a limit of 500 mutations per batch operation, so we flush
-   * changes to Datastore every 500 entities.
-   */
-  private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-
-  /**
-   * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
-   * {@code query}, and optionally {@code namespace} using {@link V1Beta3.Read#withProjectId},
-   * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}.
-   */
-  public V1Beta3.Read read() {
-    return new V1Beta3.Read(null, null, null);
-  }
-
-  /**
-   * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
-   * objects.
-   *
-   * @see DatastoreIO
-   */
-  public static class Read extends PTransform<PBegin, PCollection<Entity>> {
-    @Nullable
-    private final String projectId;
-
-    @Nullable
-    private final Query query;
-
-    @Nullable
-    private final String namespace;
-
-    /**
-     * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
-     * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
-     * an error will be thrown.
-     */
-    private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace) {
-      this.projectId = projectId;
-      this.query = query;
-      this.namespace = namespace;
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
-     */
-    public V1Beta3.Read withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new V1Beta3.Read(projectId, query, namespace);
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
-     *
-     * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
-     * across many workers. However, when the {@link Query} is configured with a limit using
-     * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
-     * to ensure correct results.
-     */
-    public V1Beta3.Read withQuery(Query query) {
-      checkNotNull(query, "query");
-      checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
-          "Invalid query limit %s: must be positive", query.getLimit().getValue());
-      return new V1Beta3.Read(projectId, query, namespace);
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
-     */
-    public V1Beta3.Read withNamespace(String namespace) {
-      return new V1Beta3.Read(projectId, query, namespace);
-    }
-
-    @Nullable
-    public Query getQuery() {
-      return query;
-    }
-
-    @Nullable
-    public String getProjectId() {
-      return projectId;
-    }
-
-    @Nullable
-    public String getNamespace() {
-      return namespace;
-    }
-
-    @Override
-    public PCollection<Entity> apply(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkNotNull(projectId, "projectId");
-      checkNotNull(query, "query");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("ProjectId"))
-          .addIfNotNull(DisplayData.item("namespace", namespace)
-              .withLabel("Namespace"))
-          .addIfNotNull(DisplayData.item("query", query.toString())
-              .withLabel("Query"));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .add("query", query)
-          .add("namespace", namespace)
-          .toString();
-    }
-
-    @VisibleForTesting
-    DatastoreSource getSource() {
-      return new DatastoreSource(projectId, query, namespace);
-    }
-  }
-
-  /**
-   * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
-   * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
-   */
-  public Write write() {
-    return new Write(null);
-  }
-
-  /**
-   * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
-   *
-   * @see DatastoreIO
-   */
-  public static class Write extends PTransform<PCollection<Entity>, PDone> {
-    @Nullable
-    private final String projectId;
-
-    /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
-     */
-    public Write(@Nullable String projectId) {
-      this.projectId = projectId;
-    }
-
-    /**
-     * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
-     */
-    public Write withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new Write(projectId);
-    }
-
-    @Override
-    public PDone apply(PCollection<Entity> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
-    }
-
-    @Override
-    public void validate(PCollection<Entity> input) {
-      checkNotNull(projectId, "projectId");
-    }
-
-    @Nullable
-    public String getProjectId() {
-      return projectId;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .toString();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
-    }
-  }
-
-  /**
-   * A {@link org.apache.beam.sdk.io.Source} that reads data from Datastore.
-   */
-  static class DatastoreSource extends BoundedSource<Entity> {
-
-    @Override
-    public Coder<Entity> getDefaultOutputCoder() {
-      return ProtoCoder.of(Entity.class);
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) {
-      return false;
-    }
-
-    @Override
-    public List<DatastoreSource> splitIntoBundles(long desiredBundleSizeBytes,
-        PipelineOptions options) throws Exception {
-      // Users may request a limit on the number of results. We can currently support this by
-      // simply disabling parallel reads and using only a single split.
-      if (query.hasLimit()) {
-        return ImmutableList.of(this);
-      }
-
-      long numSplits;
-      try {
-        numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
-      } catch (Exception e) {
-        // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.
-        numSplits = 12;
-      }
-
-      // If the desiredBundleSize or number of workers results in 1 split, simply return
-      // a source that reads from the original query.
-      if (numSplits <= 1) {
-        return ImmutableList.of(this);
-      }
-
-      List<Query> datastoreSplits;
-      try {
-        datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
-      } catch (IllegalArgumentException | DatastoreException e) {
-        LOG.warn("Unable to parallelize the given query: {}", query, e);
-        return ImmutableList.of(this);
-      }
-
-      ImmutableList.Builder<DatastoreSource> splits = ImmutableList.builder();
-      for (Query splitQuery : datastoreSplits) {
-        splits.add(new DatastoreSource(projectId, splitQuery, namespace));
-      }
-      return splits.build();
-    }
-
-    @Override
-    public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
-      return new DatastoreReader(this, getDatastore(pipelineOptions));
-    }
-
-    @Override
-    public void validate() {
-      checkNotNull(query, "query");
-      checkNotNull(projectId, "projectId");
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      // Datastore provides no way to get a good estimate of how large the result of a query
-      // will be. As a rough approximation, we attempt to fetch the statistics of the whole
-      // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
-      // is specified in the query.
-      //
-      // See https://cloud.google.com/datastore/docs/concepts/stats
-      if (mockEstimateSizeBytes != null) {
-        return mockEstimateSizeBytes;
-      }
-
-      Datastore datastore = getDatastore(options);
-      if (query.getKindCount() != 1) {
-        throw new UnsupportedOperationException(
-            "Can only estimate size for queries specifying exactly 1 kind.");
-      }
-      String ourKind = query.getKind(0).getName();
-      long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
-      Query.Builder query = Query.newBuilder();
-      if (namespace == null) {
-        query.addKindBuilder().setName("__Stat_Kind__");
-      } else {
-        query.addKindBuilder().setName("__Ns_Stat_Kind__");
-      }
-      query.setFilter(makeAndFilter(
-          makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
-          makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
-      RunQueryRequest request = makeRequest(query.build());
-
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
-
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultsCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore statistics for kind " + ourKind + " unavailable");
-      }
-      Entity entity = batch.getEntityResults(0).getEntity();
-      return entity.getProperties().get("entity_bytes").getIntegerValue();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("ProjectId"))
-          .addIfNotNull(DisplayData.item("namespace", namespace)
-              .withLabel("Namespace"))
-          .addIfNotNull(DisplayData.item("query", query.toString())
-              .withLabel("Query"));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .add("query", query)
-          .add("namespace", namespace)
-          .toString();
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
-    private final String projectId;
-    private final Query query;
-    @Nullable
-    private final String namespace;
-
-    /** For testing only. TODO: This could be much cleaner with dependency injection. */
-    @Nullable
-    private QuerySplitter mockSplitter;
-    @Nullable
-    private Long mockEstimateSizeBytes;
-
-    DatastoreSource(String projectId, Query query, @Nullable String namespace) {
-      this.projectId = projectId;
-      this.query = query;
-      this.namespace = namespace;
-    }
-
-    /**
-     * A helper function to get the split queries, taking into account the optional
-     * {@code namespace} and whether there is a mock splitter.
-     */
-    private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
-        throws DatastoreException {
-      // If namespace is set, include it in the split request so splits are calculated accordingly.
-      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
-      if (namespace != null) {
-        partitionBuilder.setNamespaceId(namespace);
-      }
-
-      if (mockSplitter != null) {
-        // For testing.
-        return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null);
-      }
-
-      return DatastoreHelper.getQuerySplitter().getSplits(
-          query, partitionBuilder.build(), numSplits, getDatastore(options));
-    }
-
-    /**
-     * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this
-     * {@code DatastoreSource}. For example, sets the {@code namespace} for the request.
-     */
-    private RunQueryRequest makeRequest(Query query) {
-      RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
-      if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-      }
-      return requestBuilder.build();
-    }
-
-    /**
-     * Datastore system tables with statistics are periodically updated. This method fetches
-     * the latest timestamp of statistics update using the {@code __Stat_Total__} table.
-     */
-    private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
-      Query.Builder query = Query.newBuilder();
-      query.addKindBuilder().setName("__Stat_Total__");
-      query.addOrder(makeOrder("timestamp", DESCENDING));
-      query.setLimit(Int32Value.newBuilder().setValue(1));
-      RunQueryRequest request = makeRequest(query.build());
-
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
-          System.currentTimeMillis() - now);
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultsCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore total statistics for project " + projectId + " unavailable");
-      }
-      Entity entity = batch.getEntityResults(0).getEntity();
-      return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
-    }
-
-    private Datastore getDatastore(PipelineOptions pipelineOptions) {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .projectId(projectId)
-              .initializer(
-                  new RetryHttpRequestInitializer()
-              );
-
-      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
-      }
-      return DatastoreFactory.get().create(builder.build());
-    }
-
-    /** For testing only. */
-    DatastoreSource withMockSplitter(QuerySplitter splitter) {
-      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
-      res.mockSplitter = splitter;
-      res.mockEstimateSizeBytes = mockEstimateSizeBytes;
-      return res;
-    }
-
-    /** For testing only. */
-    DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) {
-      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
-      res.mockSplitter = mockSplitter;
-      res.mockEstimateSizeBytes = estimateSizeBytes;
-      return res;
-    }
-
-    @VisibleForTesting
-    Query getQuery() {
-      return query;
-    }
-  }
-
-  /**
-   * A {@link DatastoreSource.Reader} over the records from a query of the datastore.
-   *
-   * <p>Timestamped records are currently not supported.
-   * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-   */
-  @VisibleForTesting
-  static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
-    private final DatastoreSource source;
-
-    /**
-     * Datastore to read from.
-     */
-    private final Datastore datastore;
-
-    /**
-     * True if more results may be available.
-     */
-    private boolean moreResults;
-
-    /**
-     * Iterator over records.
-     */
-    private java.util.Iterator<EntityResult> entities;
-
-    /**
-     * Current batch of query results.
-     */
-    private QueryResultBatch currentBatch;
-
-    /**
-     * Maximum number of results to request per query.
-     *
-     * <p>Must be set, or it may result in an I/O error when querying
-     * Cloud Datastore.
-     */
-    private static final int QUERY_BATCH_LIMIT = 500;
-
-    /**
-     * Remaining user-requested limit on the number of sources to return. If the user did not set a
-     * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never
-     * be decremented.
-     */
-    private int userLimit;
-
-    private volatile boolean done = false;
-
-    private Entity currentEntity;
-
-    /**
-     * Returns a DatastoreReader with DatastoreSource and Datastore object set.
-     *
-     * @param datastore a datastore connection to use.
-     */
-    public DatastoreReader(DatastoreSource source, Datastore datastore) {
-      this.source = source;
-      this.datastore = datastore;
-      // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
-      userLimit = source.query.hasLimit()
-          ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
-    }
-
-    @Override
-    public Entity getCurrent() {
-      return currentEntity;
-    }
-
-    @Override
-    public final long getSplitPointsConsumed() {
-      return done ? 1 : 0;
-    }
-
-    @Override
-    public final long getSplitPointsRemaining() {
-      return done ? 0 : 1;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (entities == null || (!entities.hasNext() && moreResults)) {
-        try {
-          entities = getIteratorAndMoveCursor();
-        } catch (DatastoreException e) {
-          throw new IOException(e);
-        }
-      }
-
-      if (entities == null || !entities.hasNext()) {
-        currentEntity = null;
-        done = true;
-        return false;
-      }
-
-      currentEntity = entities.next().getEntity();
-      return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Nothing
-    }
-
-    @Override
-    public DatastoreSource getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public DatastoreSource splitAtFraction(double fraction) {
-      // Not supported.
-      return null;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      // Not supported.
-      return null;
-    }
-
-    /**
-     * Returns an iterator over the next batch of records for the query
-     * and updates the cursor to get the next batch as needed.
-     * Query has specified limit and offset from InputSplit.
-     */
-    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
-      Query.Builder query = source.query.toBuilder().clone();
-      query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
-      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
-        query.setStartCursor(currentBatch.getEndCursor());
-      }
-
-      RunQueryRequest request = source.makeRequest(query.build());
-      RunQueryResponse response = datastore.runQuery(request);
-
-      currentBatch = response.getBatch();
-
-      // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
-      // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
-      // use result count to determine if more results might exist.
-      int numFetch = currentBatch.getEntityResultsCount();
-      if (source.query.hasLimit()) {
-        verify(userLimit >= numFetch,
-            "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
-            userLimit, numFetch, query.getLimit());
-        userLimit -= numFetch;
-      }
-      moreResults =
-          // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied.
-          (userLimit > 0)
-              // All indications from the API are that there are/may be more results.
-              && ((numFetch == QUERY_BATCH_LIMIT)
-              || (currentBatch.getMoreResults() == NOT_FINISHED));
-
-      // May receive a batch of 0 results if the number of records is a multiple
-      // of the request limit.
-      if (numFetch == 0) {
-        return null;
-      }
-
-      return currentBatch.getEntityResultsList().iterator();
-    }
-  }
-
-  /**
-   * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
-   */
-  static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
-    final String projectId;
-
-    public DatastoreSink(String projectId) {
-      this.projectId = projectId;
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(projectId, "projectId");
-    }
-
-    @Override
-    public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
-      return new DatastoreWriteOperation(this);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
-    }
-  }
-
-  /**
-   * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
-   */
-  private static class DatastoreWriteOperation
-      extends WriteOperation<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
-
-    private final DatastoreSink sink;
-
-    public DatastoreWriteOperation(DatastoreSink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Coder<DatastoreWriteResult> getWriterResultCoder() {
-      return SerializableCoder.of(DatastoreWriteResult.class);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {}
-
-    /**
-     * Finalizes the write.  Logs the number of entities written to the Datastore.
-     */
-    @Override
-    public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
-        throws Exception {
-      long totalEntities = 0;
-      for (DatastoreWriteResult result : writerResults) {
-        totalEntities += result.entitiesWritten;
-      }
-      LOG.info("Wrote {} elements.", totalEntities);
-    }
-
-    @Override
-    public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .projectId(sink.projectId)
-              .initializer(new RetryHttpRequestInitializer());
-      Credential credential = options.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
-      }
-      Datastore datastore = DatastoreFactory.get().create(builder.build());
-
-      return new DatastoreWriter(this, datastore);
-    }
-
-    @Override
-    public DatastoreSink getSink() {
-      return sink;
-    }
-  }
-
-  /**
-   * {@link Writer} that writes entities to a Datastore Sink.  Entities are written in batches,
-   * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.  Entities
-   * are committed as upsert mutations (either update if the key already exists, or insert if it is
-   * a new key).  If an entity does not have a complete key (i.e., it has no name or id), the bundle
-   * will fail.
-   *
-   * <p>See <a
-   * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
-   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
-   * mutations.
-   *
-   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-   * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
-   * times).
-   *
-   * <p>Visible for testing purposes.
-   */
-  @VisibleForTesting
-  static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
-    private final DatastoreWriteOperation writeOp;
-    private final Datastore datastore;
-    private long totalWritten = 0;
-
-    // Visible for testing.
-    final List<Entity> entities = new ArrayList<>();
-
-    /**
-     * Since a bundle is written in batches, we should retry the commit of a batch in order to
-     * prevent transient errors from causing the bundle to fail.
-     */
-    private static final int MAX_RETRIES = 5;
-
-    /**
-     * Initial backoff time for exponential backoff for retry attempts.
-     */
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
-    /**
-     * Returns true if a Datastore key is complete.  A key is complete if its last element
-     * has either an id or a name.
-     */
-    static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathList();
-      if (elementList.isEmpty()) {
-        return false;
-      }
-      PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
-    }
-
-    DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
-      this.writeOp = writeOp;
-      this.datastore = datastore;
-    }
-
-    @Override
-    public void open(String uId) throws Exception {}
-
-    /**
-     * Writes an entity to the Datastore.  Writes are batched, up to {@link
-     * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
-     * {@link IllegalArgumentException} will be thrown.
-     */
-    @Override
-    public void write(Entity value) throws Exception {
-      // Verify that the entity to write has a complete key.
-      if (!isValidKey(value.getKey())) {
-        throw new IllegalArgumentException(
-            "Entities to be written to the Datastore must have complete keys");
-      }
-
-      entities.add(value);
-
-      if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
-
-    /**
-     * Flushes any pending batch writes and returns a DatastoreWriteResult.
-     */
-    @Override
-    public DatastoreWriteResult close() throws Exception {
-      if (entities.size() > 0) {
-        flushBatch();
-      }
-      return new DatastoreWriteResult(totalWritten);
-    }
-
-    @Override
-    public DatastoreWriteOperation getWriteOperation() {
-      return writeOp;
-    }
-
-    /**
-     * Writes a batch of entities to the Datastore.
-     *
-     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
-     * times).  All entities in the batch will be committed again, even if the commit was partially
-     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
-     * thrown.
-     *
-     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
-     * backing off between retries fails.
-     */
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.debug("Writing batch of {} entities", entities.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch upsert entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          for (Entity entity: entities) {
-            commitRequest.addMutations(makeUpsert(entity));
-          }
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-          // Break if the commit threw no exception.
-          break;
-        } catch (DatastoreException exception) {
-          // Only log the code and message for potentially-transient errors. The entire exception
-          // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      totalWritten += entities.size();
-      LOG.debug("Successfully wrote {} entities", entities.size());
-      entities.clear();
-    }
-  }
-
-  private static class DatastoreWriteResult implements Serializable {
-    final long entitiesWritten;
-
-    public DatastoreWriteResult(long recordsWritten) {
-      this.entitiesWritten = recordsWritten;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
deleted file mode 100644
index f687739..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/datastore/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <p>Provides an API for reading from and writing to
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
- * versions of the Datastore Client libraries.
- */
-package org.apache.beam.sdk.io.datastore;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
deleted file mode 100644
index dd22289..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,617 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.datastore;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreReader;
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreSource;
-import org.apache.beam.sdk.io.datastore.V1Beta3.DatastoreWriter;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.collect.Lists;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.KindExpression;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.Value;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
-  private static final String PROJECT_ID = "testProject";
-  private static final String NAMESPACE = "testNamespace";
-  private static final String KIND = "testKind";
-  private static final Query QUERY;
-  static {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(KIND);
-    QUERY = q.build();
-  }
-  private V1Beta3.Read initialRead;
-
-  @Mock
-  Datastore mockDatastore;
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
-  @Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreSource.class);
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    initialRead = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-  }
-
-  /**
-   * Helper function to create a test {@code DataflowPipelineOptions}.
-   */
-  static final GcpOptions testPipelineOptions() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    options.setGcpCredential(new TestCredential());
-    return options;
-  }
-
-  @Test
-  public void testBuildRead() throws Exception {
-    V1Beta3.Read read = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  /**
-   * {@link #testBuildRead} but constructed in a different order.
-   */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  @Test
-  public void testReadValidationFailsProject() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(QUERY);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("project");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQuery() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("query");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationSucceedsNamespace() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
-    /* Should succeed, as a null namespace is fine. */
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-      .withProjectId(PROJECT_ID)
-      .withQuery(QUERY)
-      .withNamespace(NAMESPACE);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
-    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
-        "myProject").withQuery(Query.newBuilder().build());
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("DatastoreIO read should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-
-  @Test
-  public void testWriteDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().write().withProjectId(null);
-  }
-
-  @Test
-  public void testWriteValidationFailsWithNoProject() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    write.validate(null);
-  }
-
-  @Test
-  public void testSinkValidationSucceedsWithProject() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteDisplayData() {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write()
-        .withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSinkPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().write().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-
-  @Test
-  public void testQuerySplitBasic() throws Exception {
-    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = new ArrayList<>();
-    for (int i = 0; i < 8; ++i) {
-      mockSplits.add(
-          Query.newBuilder()
-              .addKind(mykind)
-              .setFilter(
-                  DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL,
-                      Value.newBuilder().setIntegerValue(i).build()))
-              .build());
-    }
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    /* No namespace */
-    PartitionId partition = PartitionId.newBuilder().build();
-    when(splitter.getSplits(any(Query.class), eq(partition), eq(8), any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withNamespace(null)
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
-    assertEquals(8, bundles.size());
-    for (int i = 0; i < 8; ++i) {
-      DatastoreSource bundle = bundles.get(i);
-      Query bundleQuery = bundle.getQuery();
-      assertEquals("mykind", bundleQuery.getKind(0).getName());
-      assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue());
-    }
-  }
-
-  /**
-   * Verifies that when namespace is set in the source, the split request includes the namespace.
-   */
-  @Test
-  public void testSourceWithNamespace() throws Exception {
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    DatastoreSource io = initialRead
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    io.splitIntoBundles(1024, testPipelineOptions());
-
-    PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
-    verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
-    verifyNoMoreInteractions(splitter);
-  }
-
-  @Test
-  public void testQuerySplitWithZeroSize() throws Exception {
-    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = Lists.newArrayList(
-        Query.newBuilder()
-            .addKind(mykind)
-            .build());
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(0L);
-
-    List<DatastoreSource> bundles = io.splitIntoBundles(1024, testPipelineOptions());
-    assertEquals(1, bundles.size());
-    verify(splitter, never())
-        .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
-    DatastoreSource bundle = bundles.get(0);
-    Query bundleQuery = bundle.getQuery();
-    assertEquals("mykind", bundleQuery.getKind(0).getName());
-    assertFalse(bundleQuery.hasFilter());
-  }
-
-  /**
-   * Tests that a query with a user-provided limit field does not split, and does not even
-   * interact with a query splitter.
-   */
-  @Test
-  public void testQueryDoesNotSplitWithLimitSet() throws Exception {
-    // Minimal query with a limit
-    Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
-
-    // Mock query splitter, should not be invoked.
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class)))
-        .thenThrow(new AssertionError("Splitter should not be invoked"));
-
-    List<DatastoreSource> bundles =
-        initialRead
-            .withQuery(query)
-            .getSource()
-            .withMockSplitter(splitter)
-            .splitIntoBundles(1024, testPipelineOptions());
-
-    assertEquals(1, bundles.size());
-    assertEquals(query, bundles.get(0).getQuery());
-    verifyNoMoreInteractions(splitter);
-  }
-
-  /**
-   * Tests that when {@link QuerySplitter} cannot split a query, {@link V1Beta3} falls back to
-   * a single split.
-   */
-  @Test
-  public void testQuerySplitterThrows() throws Exception {
-    // Mock query splitter that throws IllegalArgumentException
-    IllegalArgumentException exception =
-        new IllegalArgumentException("query not supported by splitter");
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(
-            splitter.getSplits(
-                any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)))
-        .thenThrow(exception);
-
-    Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
-    List<DatastoreSource> bundles =
-        initialRead
-            .withQuery(query)
-            .getSource()
-            .withMockSplitter(splitter)
-            .withMockEstimateSizeBytes(10240L)
-            .splitIntoBundles(1024, testPipelineOptions());
-
-    assertEquals(1, bundles.size());
-    assertEquals(query, bundles.get(0).getQuery());
-    verify(splitter, times(1))
-        .getSplits(
-            any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class));
-    logged.verifyWarn("Unable to parallelize the given query", exception);
-  }
-
-  @Test
-  public void testQuerySplitSizeUnavailable() throws Exception {
-    KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build());
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    DatastoreSource spiedIo = spy(io);
-    when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class)))
-        .thenThrow(new NoSuchElementException());
-
-    List<DatastoreSource> bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions());
-    assertEquals(1, bundles.size());
-    verify(splitter, never())
-        .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class));
-    DatastoreSource bundle = bundles.get(0);
-    Query bundleQuery = bundle.getQuery();
-    assertEquals("mykind", bundleQuery.getKind(0).getName());
-    assertFalse(bundleQuery.hasFilter());
-  }
-
-  /**
-   * Test building a Write using builder methods.
-   */
-  @Test
-  public void testBuildWrite() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    assertEquals(PROJECT_ID, write.getProjectId());
-  }
-
-  /**
-   * Test the detection of complete and incomplete keys.
-   */
-  @Test
-  public void testHasNameOrId() {
-    Key key;
-    // Complete with name, no ancestor
-    key = makeKey("bird", "finch").build();
-    assertTrue(DatastoreWriter.isValidKey(key));
-
-    // Complete with id, no ancestor
-    key = makeKey("bird", 123).build();
-    assertTrue(DatastoreWriter.isValidKey(key));
-
-    // Incomplete, no ancestor
-    key = makeKey("bird").build();
-    assertFalse(DatastoreWriter.isValidKey(key));
-
-    // Complete with name and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", "horned").build();
-    assertTrue(DatastoreWriter.isValidKey(key));
-
-    // Complete with id and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", 123).build();
-    assertTrue(DatastoreWriter.isValidKey(key));
-
-    // Incomplete with ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird").build();
-    assertFalse(DatastoreWriter.isValidKey(key));
-
-    key = makeKey().build();
-    assertFalse(DatastoreWriter.isValidKey(key));
-  }
-
-  /**
-   * Test that entities with incomplete keys cannot be updated.
-   */
-  @Test
-  public void testAddEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
-    writer.write(entity);
-  }
-
-  /**
-   * Test that entities are added to the batch to update.
-   */
-  @Test
-  public void testAddingEntities() throws Exception {
-    List<Entity> expected = Lists.newArrayList(
-        Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
-        Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
-        Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
-
-    List<Entity> allEntities = Lists.newArrayList(expected);
-    Collections.shuffle(allEntities);
-
-    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
-    writer.open("test_id");
-    for (Entity entity : allEntities) {
-      writer.write(entity);
-    }
-
-    assertEquals(expected.size(), writer.entities.size());
-    assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
-  }
-
-  /** Datastore batch API limit in number of records per query. */
-  private static final int DATASTORE_QUERY_BATCH_LIMIT = 500;
-
-  /**
-   * A helper function that creates mock {@link Entity} results in response to a query. Always
-   * indicates that more results are available, unless the batch is limited to fewer than
-   * {@link #DATASTORE_QUERY_BATCH_LIMIT} results.
-   */
-  private static RunQueryResponse mockResponseForQuery(Query q) {
-    // Every query V1Beta3 sends should have a limit.
-    assertTrue(q.hasLimit());
-
-    // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
-    int limit = q.getLimit().getValue();
-    assertThat(limit, greaterThanOrEqualTo(1));
-    assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
-
-    // Create the requested number of entities.
-    List<EntityResult> entities = new ArrayList<>(limit);
-    for (int i = 0; i < limit; ++i) {
-      entities.add(
-          EntityResult.newBuilder()
-              .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
-              .build());
-    }
-
-    // Fill out the other parameters on the returned result batch.
-    RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
-    ret.getBatchBuilder()
-        .addAllEntityResults(entities)
-        .setEntityResultType(EntityResult.ResultType.FULL)
-        .setMoreResults(
-            limit == DATASTORE_QUERY_BATCH_LIMIT
-                ? QueryResultBatch.MoreResultsType.NOT_FINISHED
-                : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
-    return ret.build();
-  }
-
-  /** Helper function to run a test reading from a limited-result query. */
-  private void runQueryLimitReadTest(int numEntities) throws Exception {
-    // An empty query to read entities.
-    Query query = Query.newBuilder().setLimit(
-        Int32Value.newBuilder().setValue(numEntities)).build();
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject");
-
-    // Use mockResponseForQuery to generate results.
-    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
-        .thenAnswer(
-            new Answer<RunQueryResponse>() {
-              @Override
-              public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable {
-                Query q = ((RunQueryRequest) invocation.getArguments()[0]).getQuery();
-                return mockResponseForQuery(q);
-              }
-            });
-
-    // Actually instantiate the reader.
-    DatastoreReader reader = new DatastoreReader(read.getSource(), mockDatastore);
-
-    // Simply count the number of results returned by the reader.
-    assertTrue(reader.start());
-    int resultCount = 1;
-    while (reader.advance()) {
-      resultCount++;
-    }
-    reader.close();
-
-    // Validate the number of results.
-    assertEquals(numEntities, resultCount);
-  }
-
-  /** Tests reading with a query limit less than one batch. */
-  @Test
-  public void testReadingWithLimitOneBatch() throws Exception {
-    runQueryLimitReadTest(5);
-  }
-
-  /** Tests reading with a query limit more than one batch, and not a multiple. */
-  @Test
-  public void testReadingWithLimitMultipleBatches() throws Exception {
-    runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5);
-  }
-
-  /** Tests reading several batches, using an exact multiple of batch size results. */
-  @Test
-  public void testReadingWithLimitMultipleBatchesExactMultiple() throws Exception {
-    runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 0a814c1..93165ff 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -103,6 +103,18 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
+    <!-- Build dependencies -->
+
+    <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-proto-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.datastore</groupId>
+      <artifactId>datastore-v1beta3-protos</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
@@ -133,6 +145,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.oauth-client</groupId>
+      <artifactId>google-oauth-client</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -189,6 +211,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2ccd685/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
new file mode 100644
index 0000000..bde0aba
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * <p>{@link DatastoreIO} provides an API for reading from and writing to
+ * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
+ * versions of the Datastore Client libraries.
+ *
+ * <p>To use the v1beta3 version see {@link V1Beta3}.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class DatastoreIO {
+
+  private DatastoreIO() {}
+
+  /**
+   * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
+   * of Datastore Client library.
+   */
+  public static V1Beta3 v1beta3() {
+    return new V1Beta3();
+  }
+}


[5/5] incubator-beam git commit: [BEAM-77] Move datastore into GCP IO package

Posted by lc...@apache.org.
[BEAM-77] Move datastore into GCP IO package

This closes #584


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf874d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf874d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf874d42

Branch: refs/heads/master
Commit: cf874d426a937740a8ea14fd5ddbfddad7d9ecd3
Parents: 9d70025 c2ccd68
Author: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Authored: Mon Jul 11 10:07:17 2016 -0400
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:07:17 2016 -0400

----------------------------------------------------------------------
 examples/java/pom.xml                           |   5 +
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   |   4 +-
 pom.xml                                         |   6 +
 runners/core-java/pom.xml                       |  27 +-
 sdks/java/core/pom.xml                          |  10 -
 .../beam/sdk/io/datastore/DatastoreIO.java      |  41 -
 .../apache/beam/sdk/io/datastore/V1Beta3.java   | 992 -------------------
 .../beam/sdk/io/datastore/package-info.java     |  24 -
 .../beam/sdk/io/datastore/V1Beta3Test.java      | 617 ------------
 sdks/java/io/google-cloud-platform/pom.xml      |  28 +
 .../beam/sdk/io/gcp/datastore/DatastoreIO.java  |  41 +
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 992 +++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/package-info.java |  24 +
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  | 617 ++++++++++++
 15 files changed, 1722 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------



[4/5] incubator-beam git commit: Clean up duplicate dependencies in runners core java pom

Posted by lc...@apache.org.
Clean up duplicate dependencies in runners core java pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cccbacf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cccbacf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cccbacf

Branch: refs/heads/master
Commit: 1cccbacf6ccc5f1942848ba812cdb3d3d5a2e6c5
Parents: 9d70025
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jul 2 12:09:13 2016 -0700
Committer: Luke Cwik <lc...@visitor-lcwik.wat.corp.google.com>
Committed: Mon Jul 11 10:06:19 2016 -0400

----------------------------------------------------------------------
 runners/core-java/pom.xml | 27 ++++++---------------------
 1 file changed, 6 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cccbacf/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index c7eea4a..fc8be0a 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -182,46 +182,31 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
+    <!-- build dependencies -->
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>annotations</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <!-- build dependencies -->
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>annotations</artifactId>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
 
     <!-- test dependencies -->