You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 22:10:13 UTC
[1/2] incubator-beam git commit: Query latest timestamp
Repository: incubator-beam
Updated Branches:
refs/heads/master de249e614 -> 9abd0bc8a
Query latest timestamp
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9d0f8d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9d0f8d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9d0f8d2
Branch: refs/heads/master
Commit: d9d0f8d28f7ef31f91998f60ce4e8bfab1cee913
Parents: de249e6
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Tue Aug 23 16:44:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 15:09:15 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 41 +++++++++++--
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 60 +++++++++++++++++---
2 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9d0f8d2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index c7433d3..8456e02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -239,6 +239,7 @@ public class DatastoreV1 {
int numSplits;
try {
long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+ LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes);
numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
} catch (Exception e) {
@@ -250,6 +251,33 @@ public class DatastoreV1 {
}
/**
+ * Datastore system tables with statistics are periodically updated. This method fetches
+ * the latest timestamp (in microseconds) of statistics update using the {@code __Stat_Total__}
+ * table.
+ */
+ private static long queryLatestStatisticsTimestamp(Datastore datastore,
+ @Nullable String namespace) throws DatastoreException {
+ Query.Builder query = Query.newBuilder();
+ if (namespace == null) {
+ query.addKindBuilder().setName("__Stat_Total__");
+ } else {
+ query.addKindBuilder().setName("__Stat_Ns_Total__");
+ }
+ query.addOrder(makeOrder("timestamp", DESCENDING));
+ query.setLimit(Int32Value.newBuilder().setValue(1));
+ RunQueryRequest request = makeRequest(query.build(), namespace);
+
+ RunQueryResponse response = datastore.runQuery(request);
+ QueryResultBatch batch = response.getBatch();
+ if (batch.getEntityResultsCount() == 0) {
+ throw new NoSuchElementException(
+ "Datastore total statistics unavailable");
+ }
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
+ }
+
+ /**
* Get the estimated size of the data returned by the given query.
*
* <p>Datastore provides no way to get a good estimate of how large the result of a query
@@ -261,17 +289,17 @@ public class DatastoreV1 {
static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
throws DatastoreException {
String ourKind = query.getKind(0).getName();
+ long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
+ LOG.info("Latest stats timestamp : {}", latestTimestamp);
+
Query.Builder queryBuilder = Query.newBuilder();
if (namespace == null) {
queryBuilder.addKindBuilder().setName("__Stat_Kind__");
} else {
- queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
+ queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
}
queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
-
- // Get the latest statistics
- queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
- queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
+ queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()));
RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
@@ -547,6 +575,7 @@ public class DatastoreV1 {
estimatedNumSplits = numSplits;
}
+ LOG.info("Splitting the query into {} splits", estimatedNumSplits);
List<Query> querySplits;
try {
querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
@@ -866,7 +895,7 @@ public class DatastoreV1 {
@FinishBundle
public void finishBundle(Context c) throws Exception {
- if (mutations.size() > 0) {
+ if (!mutations.isEmpty()) {
flushBatch();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9d0f8d2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ab1df2f..138671d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -61,6 +61,7 @@ import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -561,14 +562,23 @@ public class DatastoreV1Test {
@Test
public void testEstimatedSizeBytes() throws Exception {
long entityBytes = 100L;
+ // In seconds
+ long timestamp = 1234L;
+
+ RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE),
+ NAMESPACE);
+ RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
// Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+ RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+ when(mockDatastore.runQuery(latestTimestampRequest))
+ .thenReturn(latestTimestampResponse);
when(mockDatastore.runQuery(statRequest))
.thenReturn(statResponse);
assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
+ verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
verify(mockDatastore, times(1)).runQuery(statRequest);
}
@@ -609,11 +619,19 @@ public class DatastoreV1Test {
int numSplits = 0;
int expectedNumSplits = 20;
long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+ // In seconds
+ long timestamp = 1234L;
+
+ RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE),
+ NAMESPACE);
+ RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
// Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+ RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+ when(mockDatastore.runQuery(latestTimestampRequest))
+ .thenReturn(latestTimestampResponse);
when(mockDatastore.runQuery(statRequest))
.thenReturn(statResponse);
when(mockQuerySplitter.getSplits(
@@ -629,6 +647,7 @@ public class DatastoreV1Test {
verifyUniqueKeys(queries);
verify(mockQuerySplitter, times(1)).getSplits(
eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+ verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
verify(mockDatastore, times(1)).runQuery(statRequest);
}
@@ -752,7 +771,7 @@ public class DatastoreV1Test {
/** Builds a per-kind statistics response with the given entity size. */
private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
- RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+ RunQueryResponse.Builder statKindResponse = RunQueryResponse.newBuilder();
Entity.Builder entity = Entity.newBuilder();
entity.setKey(makeKey("dummyKind", "dummyId"));
entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
@@ -760,24 +779,51 @@ public class DatastoreV1Test {
entityResult.setEntity(entity);
QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
batch.addEntityResults(entityResult);
+ statKindResponse.setBatch(batch);
+ return statKindResponse.build();
+ }
+
+ /** Builds a response of the given timestamp. */
+ private static RunQueryResponse makeLatestTimestampResponse(long timestamp) {
+ RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+ Entity.Builder entity = Entity.newBuilder();
+ entity.setKey(makeKey("dummyKind", "dummyId"));
+ entity.getMutableProperties().put("timestamp", makeValue(new Date(timestamp * 1000)).build());
+ EntityResult.Builder entityResult = EntityResult.newBuilder();
+ entityResult.setEntity(entity);
+ QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
+ batch.addEntityResults(entityResult);
timestampResponse.setBatch(batch);
return timestampResponse.build();
}
/** Builds a per-kind statistics query for the given timestamp and namespace. */
- private static Query makeStatKindQuery(String namespace) {
+ private static Query makeStatKindQuery(String namespace, long timestamp) {
Query.Builder statQuery = Query.newBuilder();
if (namespace == null) {
statQuery.addKindBuilder().setName("__Stat_Kind__");
} else {
- statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
+ statQuery.addKindBuilder().setName("__Stat_Ns_Kind__");
}
statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
- statQuery.addOrder(makeOrder("timestamp", DESCENDING));
- statQuery.setLimit(Int32Value.newBuilder().setValue(1));
+ statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build());
return statQuery.build();
}
+ /** Builds a latest timestamp statistics query. */
+ private static Query makeLatestTimestampQuery(String namespace) {
+ Query.Builder timestampQuery = Query.newBuilder();
+ if (namespace == null) {
+ timestampQuery.addKindBuilder().setName("__Stat_Total__");
+ } else {
+ timestampQuery.addKindBuilder().setName("__Stat_Ns_Total__");
+ }
+ timestampQuery.addOrder(makeOrder("timestamp", DESCENDING));
+ timestampQuery.setLimit(Int32Value.newBuilder().setValue(1));
+ return timestampQuery.build();
+ }
+
+
/** Generate dummy query splits. */
private List<Query> splitQuery(Query query, int numSplits) {
List<Query> queries = new LinkedList<>();
[2/2] incubator-beam git commit: Closes #868
Posted by dh...@apache.org.
Closes #868
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9abd0bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9abd0bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9abd0bc8
Branch: refs/heads/master
Commit: 9abd0bc8a42568fd034930aecaa1af91d77486d4
Parents: de249e6 d9d0f8d
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:10:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 15:10:00 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 41 +++++++++++--
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 60 +++++++++++++++++---
2 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------