You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2019/11/11 19:11:56 UTC
[beam] branch master updated: Update
BigQueryHllSketchCompatibilityIT to cover empty sketch cases
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eaeeb6a Update BigQueryHllSketchCompatibilityIT to cover empty sketch cases
new 47d5ee9 Merge pull request #9778 from robinyqiu/hll
eaeeb6a is described below
commit eaeeb6a93e35ad1c75e2cf650525db70d64ae608
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Fri Oct 11 16:28:03 2019 -0700
Update BigQueryHllSketchCompatibilityIT to cover empty sketch cases
---
.../beam/sdk/extensions/zetasketch/HllCount.java | 19 ++++
.../BigQueryHllSketchCompatibilityIT.java | 124 +++++++++++++++------
.../sdk/extensions/zetasketch/HllCountTest.java | 14 +++
3 files changed, 125 insertions(+), 32 deletions(-)
diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
index 5a975da..e4851bd 100644
--- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
+++ b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.zetasketch;
import com.google.zetasketch.HyperLogLogPlusPlus;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
@@ -108,6 +110,23 @@ public final class HllCount {
private HllCount() {}
/**
+ * Converts the passed-in sketch from {@code ByteBuffer} to {@code byte[]}, mapping {@code null
+ * ByteBuffer}s (representing empty sketches) to empty {@code byte[]}s.
+ *
+ * <p>Utility method to convert sketches materialized with ZetaSQL/BigQuery to valid inputs for
+ * Beam {@code HllCount} transforms.
+ */
+ public static byte[] getSketchFromByteBuffer(@Nullable ByteBuffer bf) {
+ if (bf == null) {
+ return new byte[0];
+ } else {
+ byte[] result = new byte[bf.remaining()];
+ bf.get(result);
+ return result;
+ }
+ }
+
+ /**
* Provides {@code PTransform}s to aggregate inputs into HLL++ sketches. The four supported input
* types are {@code Integer}, {@code Long}, {@code String}, and {@code byte[]}.
*
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
index 3f7927d..462a715 100644
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
+++ b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -65,23 +66,32 @@ public class BigQueryHllSketchCompatibilityIT {
private static final List<String> TEST_DATA =
Arrays.asList("Apple", "Orange", "Banana", "Orange");
- // Data Table: used by testReadSketchFromBigQuery())
+ // Data Table: used by tests reading sketches from BigQuery
// Schema: only one STRING field named "data".
- // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
- private static final String DATA_TABLE_ID = "hll_data";
private static final String DATA_FIELD_NAME = "data";
private static final String DATA_FIELD_TYPE = "STRING";
private static final String QUERY_RESULT_FIELD_NAME = "sketch";
- private static final Long EXPECTED_COUNT = 3L;
- // Sketch Table: used by testWriteSketchToBigQuery()
+ // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
+ private static final String DATA_TABLE_ID_NON_EMPTY = "hll_data_non_empty";
+ private static final Long EXPECTED_COUNT_NON_EMPTY = 3L;
+
+ // Content: empty
+ private static final String DATA_TABLE_ID_EMPTY = "hll_data_empty";
+ private static final Long EXPECTED_COUNT_EMPTY = 0L;
+
+ // Sketch Table: used by tests writing sketches to BigQuery
// Schema: only one BYTES field named "sketch".
- // Content: will be overridden by the sketch computed by the test pipeline each time the test runs
- private static final String SKETCH_TABLE_ID = "hll_sketch";
private static final String SKETCH_FIELD_NAME = "sketch";
private static final String SKETCH_FIELD_TYPE = "BYTES";
+
+ // Content: will be overridden by the sketch computed by the test pipeline each time the test runs
+ private static final String SKETCH_TABLE_ID = "hll_sketch";
// SHA-1 hash of string "[3]", the string representation of a row that has only one field 3 in it
- private static final String EXPECTED_CHECKSUM = "f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+ private static final String EXPECTED_CHECKSUM_NON_EMPTY =
+ "f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+ // SHA-1 hash of string "[0]", the string representation of a row that has only one field 0 in it
+ private static final String EXPECTED_CHECKSUM_EMPTY = "1184f5b8d4b6dd08709cf1513f26744167065e0d";
static {
ApplicationNameOptions options =
@@ -93,31 +103,40 @@ public class BigQueryHllSketchCompatibilityIT {
}
@BeforeClass
- public static void prepareDatasetAndDataTable() throws Exception {
+ public static void prepareDatasetAndDataTables() throws Exception {
BIGQUERY_CLIENT.createNewDataset(PROJECT_ID, DATASET_ID);
- // Create Data Table
TableSchema dataTableSchema =
new TableSchema()
.setFields(
Collections.singletonList(
new TableFieldSchema().setName(DATA_FIELD_NAME).setType(DATA_FIELD_TYPE)));
- Table dataTable =
+
+ Table dataTableNonEmpty =
new Table()
.setSchema(dataTableSchema)
.setTableReference(
new TableReference()
.setProjectId(PROJECT_ID)
.setDatasetId(DATASET_ID)
- .setTableId(DATA_TABLE_ID));
- BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTable);
-
- // Prepopulate test data to Data Table
+ .setTableId(DATA_TABLE_ID_NON_EMPTY));
+ BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableNonEmpty);
+ // Prepopulates dataTableNonEmpty with TEST_DATA
List<Map<String, Object>> rows =
TEST_DATA.stream()
.map(v -> Collections.singletonMap(DATA_FIELD_NAME, (Object) v))
.collect(Collectors.toList());
- BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID, rows);
+ BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID_NON_EMPTY, rows);
+
+ Table dataTableEmpty =
+ new Table()
+ .setSchema(dataTableSchema)
+ .setTableReference(
+ new TableReference()
+ .setProjectId(PROJECT_ID)
+ .setDatasetId(DATASET_ID)
+ .setTableId(DATA_TABLE_ID_EMPTY));
+ BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableEmpty);
}
@AfterClass
@@ -126,22 +145,41 @@ public class BigQueryHllSketchCompatibilityIT {
}
/**
- * Test that HLL++ sketch computed in BigQuery can be processed by Beam. Hll sketch is computed by
- * {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the test verifies that we can run {@link
- * HllCount.MergePartial} and {@link HllCount.Extract} on the sketch in Beam to get the correct
- * estimated count.
+ * Tests that a non-empty HLL++ sketch computed in BigQuery can be processed by Beam.
+ *
+ * <p>The Hll sketch is computed by {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the
+ * test verifies that we can run {@link HllCount.MergePartial} and {@link HllCount.Extract} on the
+ * sketch in Beam to get the correct estimated count.
+ */
+ @Test
+ public void testReadNonEmptySketchFromBigQuery() {
+ readSketchFromBigQuery(DATA_TABLE_ID_NON_EMPTY, EXPECTED_COUNT_NON_EMPTY);
+ }
+
+ /**
+ * Tests that an empty HLL++ sketch computed in BigQuery can be processed by Beam.
+ *
+ * <p>The Hll sketch is computed by {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the
+ * test verifies that we can run {@link HllCount.MergePartial} and {@link HllCount.Extract} on the
+ * sketch in Beam to get the correct estimated count.
*/
@Test
- public void testReadSketchFromBigQuery() {
- String tableSpec = String.format("%s.%s", DATASET_ID, DATA_TABLE_ID);
+ public void testReadEmptySketchFromBigQuery() {
+ readSketchFromBigQuery(DATA_TABLE_ID_EMPTY, EXPECTED_COUNT_EMPTY);
+ }
+
+ private void readSketchFromBigQuery(String tableId, Long expectedCount) {
+ String tableSpec = String.format("%s.%s", DATASET_ID, tableId);
String query =
String.format(
"SELECT HLL_COUNT.INIT(%s) AS %s FROM %s",
DATA_FIELD_NAME, QUERY_RESULT_FIELD_NAME, tableSpec);
+
SerializableFunction<SchemaAndRecord, byte[]> parseQueryResultToByteArray =
- (SchemaAndRecord schemaAndRecord) ->
+ input ->
// BigQuery BYTES type corresponds to Java java.nio.ByteBuffer type
- ((ByteBuffer) schemaAndRecord.getRecord().get(QUERY_RESULT_FIELD_NAME)).array();
+ HllCount.getSketchFromByteBuffer(
+ (ByteBuffer) input.getRecord().get(QUERY_RESULT_FIELD_NAME));
TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
@@ -156,17 +194,35 @@ public class BigQueryHllSketchCompatibilityIT {
.withCoder(ByteArrayCoder.of()))
.apply(HllCount.MergePartial.globally()) // no-op, only for testing MergePartial
.apply(HllCount.Extract.globally());
- PAssert.thatSingleton(result).isEqualTo(EXPECTED_COUNT);
+ PAssert.thatSingleton(result).isEqualTo(expectedCount);
p.run().waitUntilFinish();
}
/**
- * Test that HLL++ sketch computed in Beam can be processed by BigQuery. Hll sketch is computed by
- * {@link HllCount.Init} in Beam and written to BigQuery; the test verifies that we can run {@code
- * HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the correct estimated count.
+ * Tests that a non-empty HLL++ sketch computed in Beam can be processed by BigQuery.
+ *
+ * <p>The Hll sketch is computed by {@link HllCount.Init} in Beam and written to BigQuery; the
+ * test verifies that we can run {@code HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the
+ * correct estimated count.
+ */
+ @Test
+ public void testWriteNonEmptySketchToBigQuery() {
+ writeSketchToBigQuery(TEST_DATA, EXPECTED_CHECKSUM_NON_EMPTY);
+ }
+
+ /**
+ * Tests that an empty HLL++ sketch computed in Beam can be processed by BigQuery.
+ *
+ * <p>The Hll sketch is computed by {@link HllCount.Init} in Beam and written to BigQuery; the
+ * test verifies that we can run {@code HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the
+ * correct estimated count.
*/
@Test
- public void testWriteSketchToBigQuery() {
+ public void testWriteEmptySketchToBigQuery() {
+ writeSketchToBigQuery(Collections.emptyList(), EXPECTED_CHECKSUM_EMPTY);
+ }
+
+ private void writeSketchToBigQuery(List<String> testData, String expectedChecksum) {
String tableSpec = String.format("%s.%s", DATASET_ID, SKETCH_TABLE_ID);
String query =
String.format("SELECT HLL_COUNT.EXTRACT(%s) FROM %s", SKETCH_FIELD_NAME, tableSpec);
@@ -181,16 +237,20 @@ public class BigQueryHllSketchCompatibilityIT {
// After the pipeline finishes, BigqueryMatcher will send a query to retrieve the estimated
// count and verifies its correctness using checksum.
options.setOnSuccessMatcher(
- BigqueryMatcher.createUsingStandardSql(APP_NAME, PROJECT_ID, query, EXPECTED_CHECKSUM));
+ BigqueryMatcher.createUsingStandardSql(APP_NAME, PROJECT_ID, query, expectedChecksum));
Pipeline p = Pipeline.create(options);
- p.apply(Create.of(TEST_DATA))
+ p.apply(Create.of(testData).withType(TypeDescriptor.of(String.class)))
.apply(HllCount.Init.forStrings().globally())
.apply(
BigQueryIO.<byte[]>write()
.to(tableSpec)
.withSchema(tableSchema)
- .withFormatFunction(sketch -> new TableRow().set(SKETCH_FIELD_NAME, sketch))
+ .withFormatFunction(
+ sketch ->
+ // Empty sketch is represented by empty byte array in Beam and by null in
+ // BigQuery
+ new TableRow().set(SKETCH_FIELD_NAME, sketch.length == 0 ? null : sketch))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
}
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
index 4ef3e6a..137e976 100644
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
+++ b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
@@ -17,8 +17,11 @@
*/
package org.apache.beam.sdk.extensions.zetasketch;
+import static org.junit.Assert.assertArrayEquals;
+
import com.google.zetasketch.HyperLogLogPlusPlus;
import com.google.zetasketch.shaded.com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -484,4 +487,15 @@ public class HllCountTest {
PAssert.thatSingleton(result).isEqualTo(KV.of("k", 0L));
p.run();
}
+
+ @Test
+ public void testGetSketchFromByteBufferForEmptySketch() {
+ assertArrayEquals(HllCount.getSketchFromByteBuffer(null), EMPTY_SKETCH);
+ }
+
+ @Test
+ public void testGetSketchFromByteBufferForNonEmptySketch() {
+ ByteBuffer bf = ByteBuffer.wrap(INTS1_SKETCH);
+ assertArrayEquals(HllCount.getSketchFromByteBuffer(bf), INTS1_SKETCH);
+ }
}