You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2019/11/11 19:11:56 UTC

[beam] branch master updated: Update BigQueryHllSketchCompatibilityIT to cover empty sketch cases

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eaeeb6a  Update BigQueryHllSketchCompatibilityIT to cover empty sketch cases
     new 47d5ee9  Merge pull request #9778 from robinyqiu/hll
eaeeb6a is described below

commit eaeeb6a93e35ad1c75e2cf650525db70d64ae608
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Fri Oct 11 16:28:03 2019 -0700

    Update BigQueryHllSketchCompatibilityIT to cover empty sketch cases
---
 .../beam/sdk/extensions/zetasketch/HllCount.java   |  19 ++++
 .../BigQueryHllSketchCompatibilityIT.java          | 124 +++++++++++++++------
 .../sdk/extensions/zetasketch/HllCountTest.java    |  14 +++
 3 files changed, 125 insertions(+), 32 deletions(-)

diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
index 5a975da..e4851bd 100644
--- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
+++ b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/HllCount.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.extensions.zetasketch;
 
 import com.google.zetasketch.HyperLogLogPlusPlus;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -108,6 +110,23 @@ public final class HllCount {
   private HllCount() {}
 
   /**
+   * Converts the passed-in sketch from {@code ByteBuffer} to {@code byte[]}, mapping {@code null
+   * ByteBuffer}s (representing empty sketches) to empty {@code byte[]}s.
+   *
+   * <p>Utility method to convert sketches materialized with ZetaSQL/BigQuery to valid inputs for
+   * Beam {@code HllCount} transforms.
+   */
+  public static byte[] getSketchFromByteBuffer(@Nullable ByteBuffer bf) {
+    if (bf == null) {
+      return new byte[0];
+    } else {
+      byte[] result = new byte[bf.remaining()];
+      bf.get(result);
+      return result;
+    }
+  }
+
+  /**
    * Provides {@code PTransform}s to aggregate inputs into HLL++ sketches. The four supported input
    * types are {@code Integer}, {@code Long}, {@code String}, and {@code byte[]}.
    *
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
index 3f7927d..462a715 100644
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
+++ b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -65,23 +66,32 @@ public class BigQueryHllSketchCompatibilityIT {
   private static final List<String> TEST_DATA =
       Arrays.asList("Apple", "Orange", "Banana", "Orange");
 
-  // Data Table: used by testReadSketchFromBigQuery())
+  // Data Table: used by tests reading sketches from BigQuery
   // Schema: only one STRING field named "data".
-  // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
-  private static final String DATA_TABLE_ID = "hll_data";
   private static final String DATA_FIELD_NAME = "data";
   private static final String DATA_FIELD_TYPE = "STRING";
   private static final String QUERY_RESULT_FIELD_NAME = "sketch";
-  private static final Long EXPECTED_COUNT = 3L;
 
-  // Sketch Table: used by testWriteSketchToBigQuery()
+  // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
+  private static final String DATA_TABLE_ID_NON_EMPTY = "hll_data_non_empty";
+  private static final Long EXPECTED_COUNT_NON_EMPTY = 3L;
+
+  // Content: empty
+  private static final String DATA_TABLE_ID_EMPTY = "hll_data_empty";
+  private static final Long EXPECTED_COUNT_EMPTY = 0L;
+
+  // Sketch Table: used by tests writing sketches to BigQuery
   // Schema: only one BYTES field named "sketch".
-  // Content: will be overridden by the sketch computed by the test pipeline each time the test runs
-  private static final String SKETCH_TABLE_ID = "hll_sketch";
   private static final String SKETCH_FIELD_NAME = "sketch";
   private static final String SKETCH_FIELD_TYPE = "BYTES";
+
+  // Content: will be overridden by the sketch computed by the test pipeline each time the test runs
+  private static final String SKETCH_TABLE_ID = "hll_sketch";
   // SHA-1 hash of string "[3]", the string representation of a row that has only one field 3 in it
-  private static final String EXPECTED_CHECKSUM = "f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+  private static final String EXPECTED_CHECKSUM_NON_EMPTY =
+      "f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+  // SHA-1 hash of string "[0]", the string representation of a row that has only one field 0 in it
+  private static final String EXPECTED_CHECKSUM_EMPTY = "1184f5b8d4b6dd08709cf1513f26744167065e0d";
 
   static {
     ApplicationNameOptions options =
@@ -93,31 +103,40 @@ public class BigQueryHllSketchCompatibilityIT {
   }
 
   @BeforeClass
-  public static void prepareDatasetAndDataTable() throws Exception {
+  public static void prepareDatasetAndDataTables() throws Exception {
     BIGQUERY_CLIENT.createNewDataset(PROJECT_ID, DATASET_ID);
 
-    // Create Data Table
     TableSchema dataTableSchema =
         new TableSchema()
             .setFields(
                 Collections.singletonList(
                     new TableFieldSchema().setName(DATA_FIELD_NAME).setType(DATA_FIELD_TYPE)));
-    Table dataTable =
+
+    Table dataTableNonEmpty =
         new Table()
             .setSchema(dataTableSchema)
             .setTableReference(
                 new TableReference()
                     .setProjectId(PROJECT_ID)
                     .setDatasetId(DATASET_ID)
-                    .setTableId(DATA_TABLE_ID));
-    BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTable);
-
-    // Prepopulate test data to Data Table
+                    .setTableId(DATA_TABLE_ID_NON_EMPTY));
+    BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableNonEmpty);
+    // Prepopulates dataTableNonEmpty with TEST_DATA
     List<Map<String, Object>> rows =
         TEST_DATA.stream()
             .map(v -> Collections.singletonMap(DATA_FIELD_NAME, (Object) v))
             .collect(Collectors.toList());
-    BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID, rows);
+    BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID_NON_EMPTY, rows);
+
+    Table dataTableEmpty =
+        new Table()
+            .setSchema(dataTableSchema)
+            .setTableReference(
+                new TableReference()
+                    .setProjectId(PROJECT_ID)
+                    .setDatasetId(DATASET_ID)
+                    .setTableId(DATA_TABLE_ID_EMPTY));
+    BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableEmpty);
   }
 
   @AfterClass
@@ -126,22 +145,41 @@ public class BigQueryHllSketchCompatibilityIT {
   }
 
   /**
-   * Test that HLL++ sketch computed in BigQuery can be processed by Beam. Hll sketch is computed by
-   * {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the test verifies that we can run {@link
-   * HllCount.MergePartial} and {@link HllCount.Extract} on the sketch in Beam to get the correct
-   * estimated count.
+   * Tests that a non-empty HLL++ sketch computed in BigQuery can be processed by Beam.
+   *
+   * <p>The Hll sketch is computed by {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the
+   * test verifies that we can run {@link HllCount.MergePartial} and {@link HllCount.Extract} on the
+   * sketch in Beam to get the correct estimated count.
+   */
+  @Test
+  public void testReadNonEmptySketchFromBigQuery() {
+    readSketchFromBigQuery(DATA_TABLE_ID_NON_EMPTY, EXPECTED_COUNT_NON_EMPTY);
+  }
+
+  /**
+   * Tests that an empty HLL++ sketch computed in BigQuery can be processed by Beam.
+   *
+   * <p>The Hll sketch is computed by {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the
+   * test verifies that we can run {@link HllCount.MergePartial} and {@link HllCount.Extract} on the
+   * sketch in Beam to get the correct estimated count.
    */
   @Test
-  public void testReadSketchFromBigQuery() {
-    String tableSpec = String.format("%s.%s", DATASET_ID, DATA_TABLE_ID);
+  public void testReadEmptySketchFromBigQuery() {
+    readSketchFromBigQuery(DATA_TABLE_ID_EMPTY, EXPECTED_COUNT_EMPTY);
+  }
+
+  private void readSketchFromBigQuery(String tableId, Long expectedCount) {
+    String tableSpec = String.format("%s.%s", DATASET_ID, tableId);
     String query =
         String.format(
             "SELECT HLL_COUNT.INIT(%s) AS %s FROM %s",
             DATA_FIELD_NAME, QUERY_RESULT_FIELD_NAME, tableSpec);
+
     SerializableFunction<SchemaAndRecord, byte[]> parseQueryResultToByteArray =
-        (SchemaAndRecord schemaAndRecord) ->
+        input ->
             // BigQuery BYTES type corresponds to Java java.nio.ByteBuffer type
-            ((ByteBuffer) schemaAndRecord.getRecord().get(QUERY_RESULT_FIELD_NAME)).array();
+            HllCount.getSketchFromByteBuffer(
+                (ByteBuffer) input.getRecord().get(QUERY_RESULT_FIELD_NAME));
 
     TestPipelineOptions options =
         TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
@@ -156,17 +194,35 @@ public class BigQueryHllSketchCompatibilityIT {
                     .withCoder(ByteArrayCoder.of()))
             .apply(HllCount.MergePartial.globally()) // no-op, only for testing MergePartial
             .apply(HllCount.Extract.globally());
-    PAssert.thatSingleton(result).isEqualTo(EXPECTED_COUNT);
+    PAssert.thatSingleton(result).isEqualTo(expectedCount);
     p.run().waitUntilFinish();
   }
 
   /**
-   * Test that HLL++ sketch computed in Beam can be processed by BigQuery. Hll sketch is computed by
-   * {@link HllCount.Init} in Beam and written to BigQuery; the test verifies that we can run {@code
-   * HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the correct estimated count.
+   * Tests that a non-empty HLL++ sketch computed in Beam can be processed by BigQuery.
+   *
+   * <p>The Hll sketch is computed by {@link HllCount.Init} in Beam and written to BigQuery; the
+   * test verifies that we can run {@code HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the
+   * correct estimated count.
+   */
+  @Test
+  public void testWriteNonEmptySketchToBigQuery() {
+    writeSketchToBigQuery(TEST_DATA, EXPECTED_CHECKSUM_NON_EMPTY);
+  }
+
+  /**
+   * Tests that an empty HLL++ sketch computed in Beam can be processed by BigQuery.
+   *
+   * <p>The Hll sketch is computed by {@link HllCount.Init} in Beam and written to BigQuery; the
+   * test verifies that we can run {@code HLL_COUNT.EXTRACT()} on the sketch in BigQuery to get the
+   * correct estimated count.
    */
   @Test
-  public void testWriteSketchToBigQuery() {
+  public void testWriteEmptySketchToBigQuery() {
+    writeSketchToBigQuery(Collections.emptyList(), EXPECTED_CHECKSUM_EMPTY);
+  }
+
+  private void writeSketchToBigQuery(List<String> testData, String expectedChecksum) {
     String tableSpec = String.format("%s.%s", DATASET_ID, SKETCH_TABLE_ID);
     String query =
         String.format("SELECT HLL_COUNT.EXTRACT(%s) FROM %s", SKETCH_FIELD_NAME, tableSpec);
@@ -181,16 +237,20 @@ public class BigQueryHllSketchCompatibilityIT {
     // After the pipeline finishes, BigqueryMatcher will send a query to retrieve the estimated
     // count and verifies its correctness using checksum.
     options.setOnSuccessMatcher(
-        BigqueryMatcher.createUsingStandardSql(APP_NAME, PROJECT_ID, query, EXPECTED_CHECKSUM));
+        BigqueryMatcher.createUsingStandardSql(APP_NAME, PROJECT_ID, query, expectedChecksum));
 
     Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(TEST_DATA))
+    p.apply(Create.of(testData).withType(TypeDescriptor.of(String.class)))
         .apply(HllCount.Init.forStrings().globally())
         .apply(
             BigQueryIO.<byte[]>write()
                 .to(tableSpec)
                 .withSchema(tableSchema)
-                .withFormatFunction(sketch -> new TableRow().set(SKETCH_FIELD_NAME, sketch))
+                .withFormatFunction(
+                    sketch ->
+                        // Empty sketch is represented by empty byte array in Beam and by null in
+                        // BigQuery
+                        new TableRow().set(SKETCH_FIELD_NAME, sketch.length == 0 ? null : sketch))
                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
     p.run().waitUntilFinish();
   }
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
index 4ef3e6a..137e976 100644
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
+++ b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/HllCountTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.zetasketch;
 
+import static org.junit.Assert.assertArrayEquals;
+
 import com.google.zetasketch.HyperLogLogPlusPlus;
 import com.google.zetasketch.shaded.com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -484,4 +487,15 @@ public class HllCountTest {
     PAssert.thatSingleton(result).isEqualTo(KV.of("k", 0L));
     p.run();
   }
+
+  @Test
+  public void testGetSketchFromByteBufferForEmptySketch() {
+    assertArrayEquals(HllCount.getSketchFromByteBuffer(null), EMPTY_SKETCH);
+  }
+
+  @Test
+  public void testGetSketchFromByteBufferForNonEmptySketch() {
+    ByteBuffer bf = ByteBuffer.wrap(INTS1_SKETCH);
+    assertArrayEquals(HllCount.getSketchFromByteBuffer(bf), INTS1_SKETCH);
+  }
 }