You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ra...@apache.org on 2021/01/05 07:26:16 UTC
[beam] 01/01: Revert "[BEAM-10234] Create ApproximateDistinct using
HLL Impl"
This is an automated email from the ASF dual-hosted git repository.
rarokni pushed a commit to branch revert-12973-hll
in repository https://gitbox.apache.org/repos/asf/beam.git
commit c7d80a855ea7b1897ab24b2d4a1db126c0ddbc7f
Author: Reza Rokni <75...@users.noreply.github.com>
AuthorDate: Tue Jan 5 15:25:31 2021 +0800
Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"
---
.../beam/sdk/transforms/ApproximateUnique.java | 20 +-
.../zetasketch/ApproximateCountDistinct.java | 288 -----------------
.../zetasketch/ApproximateCountDistinctTest.java | 342 ---------------------
3 files changed, 9 insertions(+), 641 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 760883a..c943084 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -43,22 +43,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
* the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
*
- * @deprecated
- * <p>Consider using {@code ApproximateCountDistinct} in the {@code zetasketch} extension
- * module, which makes use of the {@code HllCount} implementation.
- * <p>If {@code ApproximateCountDistinct} does not meet your needs then you can directly use
- * {@code HllCount}. Direct usage will also give you access to save intermediate aggregation
- * result into a sketch for later processing.
- * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
- * <pre>{@code
+ * <p>Consider using {@code HllCount} in the {@code zetasketch} extension module if you need better
+ * performance or need to save intermediate aggregation result into a sketch for later processing.
+ *
+ * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
+ *
+ * <pre>{@code
* PCollection<String> input = ...;
* PCollection<Long> countDistinct =
* input.apply(HllCount.Init.forStrings().globally()).apply(HllCount.Extract.globally());
* }</pre>
- * For more details about using {@code HllCount} and the {@code zetasketch} extension module,
- * see https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
+ *
+ * For more details about using {@code HllCount} and the {@code zetasketch} extension module, see
+ * https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
*/
-@Deprecated
public class ApproximateUnique {
/**
diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
deleted file mode 100644
index 9b9daf5..0000000
--- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.auto.value.AutoValue;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.zetasketch.HllCount.Init.Builder;
-import org.apache.beam.sdk.transforms.Contextful;
-import org.apache.beam.sdk.transforms.Contextful.Fn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ProcessFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
- * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>We make use of the {@link HllCount} implementation for this transform. Please use {@link
- * HllCount} directly if you need access to the sketches.
- *
- * <p>If the object is not one of {@link Byte[]} {@link Integer} {@link Double} {@link String} make
- * use of {@link Globally#via} or {@link PerKey#via}
- *
- * <h3>Examples</h3>
- *
- * <h4>Example 1: Approximate Count of Ints {@code PCollection<Integer>} and specify precision</h4>
- *
- * <pre>{@code
- * p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.globally()
- * .withPercision(PRECISION));
- *
- * }</pre>
- *
- * <h4>Example 2: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Long>> result =
- * p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
- *
- * }</pre>
- *
- * <h4>Example 3: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Foo>> approxResultInteger =
- * p.apply("Int", Create.of(Foo))
- * .apply("IntHLL", ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
- * .via(kv -> KV.of(kv.getKey(), (long) kv.getValue().hashCode())));
- * }</pre>
- */
-@Experimental
-public class ApproximateCountDistinct {
-
- private static final Logger LOG = LoggerFactory.getLogger(ApproximateCountDistinct.class);
-
- private static final List<TypeDescriptor<?>> HLL_IMPLEMENTED_TYPES =
- ImmutableList.of(
- TypeDescriptors.strings(),
- TypeDescriptors.longs(),
- TypeDescriptors.integers(),
- new TypeDescriptor<byte[]>() {});
-
- public static <T> Globally<T> globally() {
- return new AutoValue_ApproximateCountDistinct_Globally.Builder<T>()
- .setPrecision(HllCount.DEFAULT_PRECISION)
- .build();
- }
-
- public static <K, V> PerKey<K, V> perKey() {
- return new AutoValue_ApproximateCountDistinct_PerKey.Builder<K, V>()
- .setPrecision(HllCount.DEFAULT_PRECISION)
- .build();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@code PTransform} for estimating the number of distinct elements in a {@code PCollection}.
- *
- * @param <T> the type of the elements in the input {@code PCollection}
- */
- @AutoValue
- public abstract static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> {
-
- public abstract int getPrecision();
-
- public abstract Builder<T> toBuilder();
-
- @Nullable
- public abstract Contextful<Fn<T, Long>> getMapping();
-
- @AutoValue.Builder
- public abstract static class Builder<T> {
-
- public abstract Builder<T> setPrecision(int precision);
-
- public abstract Builder<T> setMapping(Contextful<Fn<T, Long>> value);
-
- public abstract Globally<T> build();
- }
-
- public Globally<T> via(ProcessFunction<T, Long> fn) {
-
- return toBuilder().setMapping(Contextful.<T, Long>fn(fn)).build();
- }
-
- public <V> Globally<V> withPercision(Integer withPercision) {
- @SuppressWarnings("unchecked")
- Globally<V> globally = (Globally<V>) toBuilder().setPrecision(withPercision).build();
- return globally;
- }
-
- @Override
- public PCollection<Long> expand(PCollection<T> input) {
-
- TypeDescriptor<T> type = input.getCoder().getEncodedTypeDescriptor();
-
- if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
- HllCount.Init.Builder<T> builder = builderForType(type);
-
- return input.apply(builder.globally()).apply(HllCount.Extract.globally());
- }
-
- // Boiler plate to avoid [argument.type.incompatible] NonNull vs Nullable
- Contextful<Fn<T, Long>> mapping = getMapping();
-
- if (mapping != null) {
- return input
- .apply(MapElements.into(TypeDescriptors.longs()).via(mapping))
- .apply(HllCount.Init.forLongs().globally())
- .apply(HllCount.Extract.globally());
- }
-
- throw new IllegalArgumentException(
- String.format(
- "%s supports Integer,"
- + " Long, String and byte[] objects directly. For other types you must provide a Mapping function.",
- this.getClass().getCanonicalName()));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
- }
- }
-
- @AutoValue
- public abstract static class PerKey<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
-
- public abstract Integer getPrecision();
-
- @Nullable
- public abstract Contextful<Fn<KV<K, V>, KV<K, Long>>> getMapping();
-
- public abstract Builder<K, V> toBuilder();
-
- @AutoValue.Builder
- public abstract static class Builder<K, V> {
-
- public abstract Builder<K, V> setPrecision(Integer precision);
-
- public abstract Builder<K, V> setMapping(Contextful<Fn<KV<K, V>, KV<K, Long>>> value);
-
- public abstract PerKey<K, V> build();
- }
-
- public <K2, V2> PerKey<K2, V2> withPercision(Integer withPercision) {
- // Work around for loss of type inference when using API.
- @SuppressWarnings("unchecked")
- PerKey<K2, V2> perKey = (PerKey<K2, V2>) this.toBuilder().setPrecision(withPercision).build();
- return perKey;
- }
-
- public PerKey<K, V> via(ProcessFunction<KV<K, V>, KV<K, Long>> fn) {
-
- return this.toBuilder().setMapping(Contextful.<KV<K, V>, KV<K, Long>>fn(fn)).build();
- }
-
- @Override
- public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
-
- Coder<V> coder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
-
- TypeDescriptor<V> type = coder.getEncodedTypeDescriptor();
-
- if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
- HllCount.Init.Builder<V> builder = builderForType(type);
-
- return input.apply(builder.perKey()).apply(HllCount.Extract.perKey());
- }
-
- // Boiler plate to avoid [argument.type.incompatible] NonNull vs Nullable
- Contextful<Fn<KV<K, V>, KV<K, Long>>> mapping = getMapping();
-
- if (mapping != null) {
- Coder<K> keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
- return input
- .apply(
- MapElements.into(
- TypeDescriptors.kvs(
- keyCoder.getEncodedTypeDescriptor(), TypeDescriptors.longs()))
- .via(mapping))
- .apply(HllCount.Init.forLongs().perKey())
- .apply(HllCount.Extract.perKey());
- }
-
- throw new IllegalArgumentException(
- String.format(
- "%s supports Integer,"
- + " Long, String and byte[] objects directly not for %s type, you must provide a Mapping use via.",
- this.getClass().getCanonicalName(), type.toString()));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static void populateDisplayData(DisplayData.Builder builder, Integer precision) {
- builder.add(DisplayData.item("precision", precision).withLabel("Precision"));
- }
-
- // HLLCount supports, Long, Integers, String and Byte primitives.
- // We will return an appropriate builder
- protected static <T> Builder<T> builderForType(TypeDescriptor<T> input) {
-
- @SuppressWarnings("rawtypes")
- HllCount.Init.Builder builder = null;
-
- if (input.equals(TypeDescriptors.strings())) {
- builder = HllCount.Init.forStrings();
- }
- if (input.equals(TypeDescriptors.longs())) {
- builder = HllCount.Init.forLongs();
- }
- if (input.equals(TypeDescriptors.integers())) {
- builder = HllCount.Init.forIntegers();
- }
- if (input.equals(new TypeDescriptor<byte[]>() {})) {
- builder = HllCount.Init.forBytes();
- }
-
- if (builder == null) {
- throw new IllegalArgumentException(String.format("Type not supported %s", input));
- }
-
- // Safe to ignore warning, as we know the type based on the check we do above.
- @SuppressWarnings("unchecked")
- Builder<T> output = (Builder<T>) builder;
-
- return output;
- }
-}
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
deleted file mode 100644
index 8796d83..0000000
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.zetasketch.HyperLogLogPlusPlus;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/** Tests for {@link ApproximateCountDistinct}. */
-public class ApproximateCountDistinctTest {
-
- @Rule public final transient TestPipeline p = TestPipeline.create();
-
- // Integer
- private static final List<Integer> INTS1 = Arrays.asList(1, 2, 3, 3, 1, 4);
- private static final Long INTS1_ESTIMATE;
-
- private static final int TEST_PRECISION = 20;
-
- static {
- HyperLogLogPlusPlus<Integer> hll = new HyperLogLogPlusPlus.Builder().buildForIntegers();
- INTS1.forEach(hll::add);
- INTS1_ESTIMATE = hll.longResult();
- }
-
- /** Test correct Builder is returned from Generic type. * */
- @Test
- public void testIntegerBuilder() {
-
- PCollection<Integer> ints = p.apply(Create.of(1));
- HllCount.Init.Builder<Integer> builder =
- ApproximateCountDistinct.<Integer>builderForType(
- ints.getCoder().getEncodedTypeDescriptor());
- PCollection<Long> result = ints.apply(builder.globally()).apply(HllCount.Extract.globally());
- PAssert.that(result).containsInAnyOrder(1L);
- p.run();
- }
- /** Test correct Builder is returned from Generic type. * */
- @Test
- public void testStringBuilder() {
-
- PCollection<String> strings = p.apply(Create.<String>of("43"));
- HllCount.Init.Builder<String> builder =
- ApproximateCountDistinct.<String>builderForType(
- strings.getCoder().getEncodedTypeDescriptor());
- PCollection<Long> result = strings.apply(builder.globally()).apply(HllCount.Extract.globally());
- PAssert.that(result).containsInAnyOrder(1L);
- p.run();
- }
- /** Test correct Builder is returned from Generic type. * */
- @Test
- public void testLongBuilder() {
-
- PCollection<Long> longs = p.apply(Create.<Long>of(1L));
- HllCount.Init.Builder<Long> builder =
- ApproximateCountDistinct.<Long>builderForType(longs.getCoder().getEncodedTypeDescriptor());
- PCollection<Long> result = longs.apply(builder.globally()).apply(HllCount.Extract.globally());
- PAssert.that(result).containsInAnyOrder(1L);
- p.run();
- }
- /** Test correct Builder is returned from Generic type. * */
- @Test
- public void testBytesBuilder() {
-
- byte[] byteArray = new byte[] {'A'};
- PCollection<byte[]> bytes = p.apply(Create.of(byteArray));
- TypeDescriptor<byte[]> a = bytes.getCoder().getEncodedTypeDescriptor();
- HllCount.Init.Builder<byte[]> builder =
- ApproximateCountDistinct.<byte[]>builderForType(
- bytes.getCoder().getEncodedTypeDescriptor());
- PCollection<Long> result = bytes.apply(builder.globally()).apply(HllCount.Extract.globally());
- PAssert.that(result).containsInAnyOrder(1L);
- p.run();
- }
-
- /** Test Integer Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesGlobalForInteger() {
- PCollection<Long> approxResultInteger =
- p.apply("Int", Create.of(INTS1)).apply("IntHLL", ApproximateCountDistinct.globally());
- PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
- p.run();
- }
-
- /** Test Long Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesGlobalForLong() {
-
- PCollection<Long> approxResultLong =
- p.apply("Long", Create.of(INTS1.stream().map(Long::valueOf).collect(Collectors.toList())))
- .apply("LongHLL", ApproximateCountDistinct.globally());
-
- PAssert.thatSingleton(approxResultLong).isEqualTo(INTS1_ESTIMATE);
-
- p.run();
- }
-
- /** Test String Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesGlobalForStrings() {
- PCollection<Long> approxResultString =
- p.apply("Str", Create.of(INTS1.stream().map(String::valueOf).collect(Collectors.toList())))
- .apply("StrHLL", ApproximateCountDistinct.globally());
-
- PAssert.thatSingleton(approxResultString).isEqualTo(INTS1_ESTIMATE);
-
- p.run();
- }
-
- /** Test Byte Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesGlobalForBytes() {
- PCollection<Long> approxResultByte =
- p.apply(
- "BytesHLL",
- Create.of(
- INTS1.stream()
- .map(x -> ByteBuffer.allocate(4).putInt(x).array())
- .collect(Collectors.toList())))
- .apply(ApproximateCountDistinct.globally());
-
- PAssert.thatSingleton(approxResultByte).isEqualTo(INTS1_ESTIMATE);
-
- p.run();
- }
-
- /** Test Integer Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesPerKeyForInteger() {
-
- List<KV<Integer, Integer>> ints = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- ints.add(KV.of(i, k));
- }
- }
-
- PCollection<KV<Integer, Long>> result =
- p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.perKey());
-
- PAssert.that(result)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-
- /** Test Long Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesPerKeyForLong() {
-
- List<KV<Integer, Long>> longs = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- longs.add(KV.of(i, (long) k));
- }
- }
-
- PCollection<KV<Integer, Long>> result =
- p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
-
- PAssert.that(result)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-
- /** Test String Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesPerKeyForStrings() {
- List<KV<Integer, String>> strings = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- strings.add(KV.of(i, String.valueOf(k)));
- }
- }
-
- PCollection<KV<Integer, Long>> result =
- p.apply("Str", Create.of(strings)).apply("StrHLL", ApproximateCountDistinct.perKey());
-
- PAssert.that(result)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-
- /** Test Byte Globally. */
- @Test
- @Category(NeedsRunner.class)
- public void testStandardTypesPerKeyForBytes() {
-
- List<KV<Integer, byte[]>> bytes = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- bytes.add(KV.of(i, ByteBuffer.allocate(4).putInt(k).array()));
- }
- }
-
- PCollection<KV<Integer, Long>> result =
- p.apply("BytesHLL", Create.of(bytes)).apply(ApproximateCountDistinct.perKey());
-
- PAssert.that(result)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-
- /** Test a general object, we will make use of a KV as the object as it already has a coder. */
- @Test
- @Category(NeedsRunner.class)
- public void testObjectTypesGlobal() {
-
- PCollection<Long> approxResultInteger =
- p.apply(
- "Int",
- Create.of(
- INTS1.stream().map(x -> KV.of(x, KV.of(x, x))).collect(Collectors.toList())))
- .apply(
- "IntHLL",
- ApproximateCountDistinct.<KV<Integer, KV<Integer, Integer>>>globally()
- .via((KV<Integer, KV<Integer, Integer>> x) -> (long) x.getValue().hashCode()));
-
- PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
- p.run();
- }
-
- /** Test a general object, we will make use of a KV as the object as it already has a coder. */
- @Test
- @Category(NeedsRunner.class)
- public void testObjectTypesPerKey() {
-
- List<KV<Integer, KV<Integer, Integer>>> ints = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- ints.add(KV.of(i, KV.of(i, k)));
- }
- }
-
- PCollection<KV<Integer, Long>> approxResultInteger =
- p.apply("Int", Create.of(ints))
- .apply(
- "IntHLL",
- ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
- .via(x -> KV.of(x.getKey(), (long) x.hashCode()))
- .withPercision(TEST_PRECISION));
-
- PAssert.that(approxResultInteger)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-
- /** Test a general object, we will make use of a KV as the object as it already has a coder. */
- @Test
- @Category(NeedsRunner.class)
- public void testGlobalPercision() {
-
- PCollection<Long> approxResultInteger =
- p.apply("Int", Create.of(INTS1))
- .apply("IntHLL", ApproximateCountDistinct.globally().withPercision(TEST_PRECISION));
-
- PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
- p.run();
- }
-
- /** Test a general object, we will make use of a KV as the object as it already has a coder. */
- @Test
- @Category(NeedsRunner.class)
- public void testPerKeyPercision() {
-
- List<KV<Integer, Integer>> ints = new ArrayList<>();
-
- for (int i = 0; i < 3; i++) {
- for (int k : INTS1) {
- ints.add(KV.of(i, k));
- }
- }
-
- PCollection<KV<Integer, Long>> approxResultInteger =
- p.apply("Int", Create.of(ints))
- .apply("IntHLL", ApproximateCountDistinct.perKey().withPercision(TEST_PRECISION));
-
- PAssert.that(approxResultInteger)
- .containsInAnyOrder(
- ImmutableList.of(
- KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
- p.run();
- }
-}