You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 22:10:13 UTC

[1/2] incubator-beam git commit: Query latest timestamp

Repository: incubator-beam
Updated Branches:
  refs/heads/master de249e614 -> 9abd0bc8a


Query latest timestamp


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9d0f8d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9d0f8d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9d0f8d2

Branch: refs/heads/master
Commit: d9d0f8d28f7ef31f91998f60ce4e8bfab1cee913
Parents: de249e6
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Tue Aug 23 16:44:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 15:09:15 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 41 +++++++++++--
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 60 +++++++++++++++++---
 2 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9d0f8d2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index c7433d3..8456e02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -239,6 +239,7 @@ public class DatastoreV1 {
       int numSplits;
       try {
         long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+        LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes);
         numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
             Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
       } catch (Exception e) {
@@ -250,6 +251,33 @@ public class DatastoreV1 {
     }
 
     /**
+     * Datastore system tables with statistics are periodically updated. This method fetches
+     * the latest timestamp (in microseconds) of statistics update using the {@code __Stat_Total__}
+     * table.
+     */
+    private static long queryLatestStatisticsTimestamp(Datastore datastore,
+        @Nullable String namespace)  throws DatastoreException {
+      Query.Builder query = Query.newBuilder();
+      if (namespace == null) {
+        query.addKindBuilder().setName("__Stat_Total__");
+      } else {
+        query.addKindBuilder().setName("__Stat_Ns_Total__");
+      }
+      query.addOrder(makeOrder("timestamp", DESCENDING));
+      query.setLimit(Int32Value.newBuilder().setValue(1));
+      RunQueryRequest request = makeRequest(query.build(), namespace);
+
+      RunQueryResponse response = datastore.runQuery(request);
+      QueryResultBatch batch = response.getBatch();
+      if (batch.getEntityResultsCount() == 0) {
+        throw new NoSuchElementException(
+            "Datastore total statistics unavailable");
+      }
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
+    }
+
+    /**
      * Get the estimated size of the data returned by the given query.
      *
      * <p>Datastore provides no way to get a good estimate of how large the result of a query
@@ -261,17 +289,17 @@ public class DatastoreV1 {
     static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
         throws DatastoreException {
       String ourKind = query.getKind(0).getName();
+      long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
+      LOG.info("Latest stats timestamp : {}", latestTimestamp);
+
       Query.Builder queryBuilder = Query.newBuilder();
       if (namespace == null) {
         queryBuilder.addKindBuilder().setName("__Stat_Kind__");
       } else {
-        queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
+        queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
       }
       queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
-
-      // Get the latest statistics
-      queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
-      queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
+      queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()));
 
       RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
 
@@ -547,6 +575,7 @@ public class DatastoreV1 {
           estimatedNumSplits = numSplits;
         }
 
+        LOG.info("Splitting the query into {} splits", estimatedNumSplits);
         List<Query> querySplits;
         try {
           querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
@@ -866,7 +895,7 @@ public class DatastoreV1 {
 
     @FinishBundle
     public void finishBundle(Context c) throws Exception {
-      if (mutations.size() > 0) {
+      if (!mutations.isEmpty()) {
         flushBatch();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9d0f8d2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ab1df2f..138671d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -61,6 +61,7 @@ import com.google.datastore.v1.client.Datastore;
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -561,14 +562,23 @@ public class DatastoreV1Test {
   @Test
   public void testEstimatedSizeBytes() throws Exception {
     long entityBytes = 100L;
+    // In seconds
+    long timestamp = 1234L;
+
+    RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE),
+        NAMESPACE);
+    RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
     // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
     RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
 
+    when(mockDatastore.runQuery(latestTimestampRequest))
+        .thenReturn(latestTimestampResponse);
     when(mockDatastore.runQuery(statRequest))
         .thenReturn(statResponse);
 
     assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
+    verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
     verify(mockDatastore, times(1)).runQuery(statRequest);
   }
 
@@ -609,11 +619,19 @@ public class DatastoreV1Test {
     int numSplits = 0;
     int expectedNumSplits = 20;
     long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+    // In seconds
+    long timestamp = 1234L;
+
+    RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE),
+        NAMESPACE);
+    RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
 
     // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
     RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
 
+    when(mockDatastore.runQuery(latestTimestampRequest))
+        .thenReturn(latestTimestampResponse);
     when(mockDatastore.runQuery(statRequest))
         .thenReturn(statResponse);
     when(mockQuerySplitter.getSplits(
@@ -629,6 +647,7 @@ public class DatastoreV1Test {
     verifyUniqueKeys(queries);
     verify(mockQuerySplitter, times(1)).getSplits(
         eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+    verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
     verify(mockDatastore, times(1)).runQuery(statRequest);
   }
 
@@ -752,7 +771,7 @@ public class DatastoreV1Test {
 
   /** Builds a per-kind statistics response with the given entity size. */
   private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
-    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+    RunQueryResponse.Builder statKindResponse = RunQueryResponse.newBuilder();
     Entity.Builder entity = Entity.newBuilder();
     entity.setKey(makeKey("dummyKind", "dummyId"));
     entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
@@ -760,24 +779,51 @@ public class DatastoreV1Test {
     entityResult.setEntity(entity);
     QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
     batch.addEntityResults(entityResult);
+    statKindResponse.setBatch(batch);
+    return statKindResponse.build();
+  }
+
+  /** Builds a response of the given timestamp. */
+  private static RunQueryResponse makeLatestTimestampResponse(long timestamp) {
+    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+    Entity.Builder entity = Entity.newBuilder();
+    entity.setKey(makeKey("dummyKind", "dummyId"));
+    entity.getMutableProperties().put("timestamp", makeValue(new Date(timestamp * 1000)).build());
+    EntityResult.Builder entityResult = EntityResult.newBuilder();
+    entityResult.setEntity(entity);
+    QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
+    batch.addEntityResults(entityResult);
     timestampResponse.setBatch(batch);
     return timestampResponse.build();
   }
 
   /** Builds a per-kind statistics query for the given timestamp and namespace. */
-  private static Query makeStatKindQuery(String namespace) {
+  private static Query makeStatKindQuery(String namespace, long timestamp) {
     Query.Builder statQuery = Query.newBuilder();
     if (namespace == null) {
       statQuery.addKindBuilder().setName("__Stat_Kind__");
     } else {
-      statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
+      statQuery.addKindBuilder().setName("__Stat_Ns_Kind__");
     }
     statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
-    statQuery.addOrder(makeOrder("timestamp", DESCENDING));
-    statQuery.setLimit(Int32Value.newBuilder().setValue(1));
+    statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build());
     return statQuery.build();
   }
 
+  /** Builds a latest timestamp statistics query. */
+  private static Query makeLatestTimestampQuery(String namespace) {
+    Query.Builder timestampQuery = Query.newBuilder();
+    if (namespace == null) {
+      timestampQuery.addKindBuilder().setName("__Stat_Total__");
+    } else {
+      timestampQuery.addKindBuilder().setName("__Stat_Ns_Total__");
+    }
+    timestampQuery.addOrder(makeOrder("timestamp", DESCENDING));
+    timestampQuery.setLimit(Int32Value.newBuilder().setValue(1));
+    return timestampQuery.build();
+  }
+
+
   /** Generate dummy query splits. */
   private List<Query> splitQuery(Query query, int numSplits) {
     List<Query> queries = new LinkedList<>();


[2/2] incubator-beam git commit: Closes #868

Posted by dh...@apache.org.
Closes #868


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9abd0bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9abd0bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9abd0bc8

Branch: refs/heads/master
Commit: 9abd0bc8a42568fd034930aecaa1af91d77486d4
Parents: de249e6 d9d0f8d
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:10:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 15:10:00 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 41 +++++++++++--
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 60 +++++++++++++++++---
 2 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------