You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ra...@apache.org on 2021/01/05 07:26:16 UTC

[beam] 01/01: Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"

This is an automated email from the ASF dual-hosted git repository.

rarokni pushed a commit to branch revert-12973-hll
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c7d80a855ea7b1897ab24b2d4a1db126c0ddbc7f
Author: Reza Rokni <75...@users.noreply.github.com>
AuthorDate: Tue Jan 5 15:25:31 2021 +0800

    Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"
---
 .../beam/sdk/transforms/ApproximateUnique.java     |  20 +-
 .../zetasketch/ApproximateCountDistinct.java       | 288 -----------------
 .../zetasketch/ApproximateCountDistinctTest.java   | 342 ---------------------
 3 files changed, 9 insertions(+), 641 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 760883a..c943084 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -43,22 +43,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
  * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
  *
- * @deprecated
- *     <p>Consider using {@code ApproximateCountDistinct} in the {@code zetasketch} extension
- *     module, which makes use of the {@code HllCount} implementation.
- *     <p>If {@code ApproximateCountDistinct} does not meet your needs then you can directly use
- *     {@code HllCount}. Direct usage will also give you access to save intermediate aggregation
- *     result into a sketch for later processing.
- *     <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
- *     <pre>{@code
+ * <p>Consider using {@code HllCount} in the {@code zetasketch} extension module if you need better
+ * performance or need to save intermediate aggregation result into a sketch for later processing.
+ *
+ * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
+ *
+ * <pre>{@code
  * PCollection<String> input = ...;
  * PCollection<Long> countDistinct =
  *     input.apply(HllCount.Init.forStrings().globally()).apply(HllCount.Extract.globally());
  * }</pre>
- *     For more details about using {@code HllCount} and the {@code zetasketch} extension module,
- *     see https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
+ *
+ * For more details about using {@code HllCount} and the {@code zetasketch} extension module, see
+ * https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
  */
-@Deprecated
 public class ApproximateUnique {
 
   /**
diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
deleted file mode 100644
index 9b9daf5..0000000
--- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.auto.value.AutoValue;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.zetasketch.HllCount.Init.Builder;
-import org.apache.beam.sdk.transforms.Contextful;
-import org.apache.beam.sdk.transforms.Contextful.Fn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ProcessFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
- * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>We make use of the {@link HllCount} implementation for this transform. Please use {@link
- * HllCount} directly if you need access to the sketches.
- *
- * <p>If the object is not one of {@link Byte[]} {@link Integer} {@link Double} {@link String} make
- * use of {@link Globally#via} or {@link PerKey#via}
- *
- * <h3>Examples</h3>
- *
- * <h4>Example 1: Approximate Count of Ints {@code PCollection<Integer>} and specify precision</h4>
- *
- * <pre>{@code
- * p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.globally()
- *   .withPercision(PRECISION));
- *
- * }</pre>
- *
- * <h4>Example 2: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Long>> result =
- *   p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
- *
- * }</pre>
- *
- * <h4>Example 3: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Foo>> approxResultInteger =
- *   p.apply("Int", Create.of(Foo))
- *     .apply("IntHLL", ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
- *       .via(kv -> KV.of(kv.getKey(), (long) kv.getValue().hashCode())));
- * }</pre>
- */
-@Experimental
-public class ApproximateCountDistinct {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ApproximateCountDistinct.class);
-
-  private static final List<TypeDescriptor<?>> HLL_IMPLEMENTED_TYPES =
-      ImmutableList.of(
-          TypeDescriptors.strings(),
-          TypeDescriptors.longs(),
-          TypeDescriptors.integers(),
-          new TypeDescriptor<byte[]>() {});
-
-  public static <T> Globally<T> globally() {
-    return new AutoValue_ApproximateCountDistinct_Globally.Builder<T>()
-        .setPrecision(HllCount.DEFAULT_PRECISION)
-        .build();
-  }
-
-  public static <K, V> PerKey<K, V> perKey() {
-    return new AutoValue_ApproximateCountDistinct_PerKey.Builder<K, V>()
-        .setPrecision(HllCount.DEFAULT_PRECISION)
-        .build();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * {@code PTransform} for estimating the number of distinct elements in a {@code PCollection}.
-   *
-   * @param <T> the type of the elements in the input {@code PCollection}
-   */
-  @AutoValue
-  public abstract static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> {
-
-    public abstract int getPrecision();
-
-    public abstract Builder<T> toBuilder();
-
-    @Nullable
-    public abstract Contextful<Fn<T, Long>> getMapping();
-
-    @AutoValue.Builder
-    public abstract static class Builder<T> {
-
-      public abstract Builder<T> setPrecision(int precision);
-
-      public abstract Builder<T> setMapping(Contextful<Fn<T, Long>> value);
-
-      public abstract Globally<T> build();
-    }
-
-    public Globally<T> via(ProcessFunction<T, Long> fn) {
-
-      return toBuilder().setMapping(Contextful.<T, Long>fn(fn)).build();
-    }
-
-    public <V> Globally<V> withPercision(Integer withPercision) {
-      @SuppressWarnings("unchecked")
-      Globally<V> globally = (Globally<V>) toBuilder().setPrecision(withPercision).build();
-      return globally;
-    }
-
-    @Override
-    public PCollection<Long> expand(PCollection<T> input) {
-
-      TypeDescriptor<T> type = input.getCoder().getEncodedTypeDescriptor();
-
-      if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
-        HllCount.Init.Builder<T> builder = builderForType(type);
-
-        return input.apply(builder.globally()).apply(HllCount.Extract.globally());
-      }
-
-      // Boiler plate to avoid  [argument.type.incompatible] NonNull vs Nullable
-      Contextful<Fn<T, Long>> mapping = getMapping();
-
-      if (mapping != null) {
-        return input
-            .apply(MapElements.into(TypeDescriptors.longs()).via(mapping))
-            .apply(HllCount.Init.forLongs().globally())
-            .apply(HllCount.Extract.globally());
-      }
-
-      throw new IllegalArgumentException(
-          String.format(
-              "%s supports Integer,"
-                  + " Long, String and byte[] objects directly. For other types you must provide a Mapping function.",
-              this.getClass().getCanonicalName()));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
-    }
-  }
-
-  @AutoValue
-  public abstract static class PerKey<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
-
-    public abstract Integer getPrecision();
-
-    @Nullable
-    public abstract Contextful<Fn<KV<K, V>, KV<K, Long>>> getMapping();
-
-    public abstract Builder<K, V> toBuilder();
-
-    @AutoValue.Builder
-    public abstract static class Builder<K, V> {
-
-      public abstract Builder<K, V> setPrecision(Integer precision);
-
-      public abstract Builder<K, V> setMapping(Contextful<Fn<KV<K, V>, KV<K, Long>>> value);
-
-      public abstract PerKey<K, V> build();
-    }
-
-    public <K2, V2> PerKey<K2, V2> withPercision(Integer withPercision) {
-      // Work around for loss of type inference when using API.
-      @SuppressWarnings("unchecked")
-      PerKey<K2, V2> perKey = (PerKey<K2, V2>) this.toBuilder().setPrecision(withPercision).build();
-      return perKey;
-    }
-
-    public PerKey<K, V> via(ProcessFunction<KV<K, V>, KV<K, Long>> fn) {
-
-      return this.toBuilder().setMapping(Contextful.<KV<K, V>, KV<K, Long>>fn(fn)).build();
-    }
-
-    @Override
-    public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
-
-      Coder<V> coder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
-
-      TypeDescriptor<V> type = coder.getEncodedTypeDescriptor();
-
-      if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
-        HllCount.Init.Builder<V> builder = builderForType(type);
-
-        return input.apply(builder.perKey()).apply(HllCount.Extract.perKey());
-      }
-
-      // Boiler plate to avoid  [argument.type.incompatible] NonNull vs Nullable
-      Contextful<Fn<KV<K, V>, KV<K, Long>>> mapping = getMapping();
-
-      if (mapping != null) {
-        Coder<K> keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
-        return input
-            .apply(
-                MapElements.into(
-                        TypeDescriptors.kvs(
-                            keyCoder.getEncodedTypeDescriptor(), TypeDescriptors.longs()))
-                    .via(mapping))
-            .apply(HllCount.Init.forLongs().perKey())
-            .apply(HllCount.Extract.perKey());
-      }
-
-      throw new IllegalArgumentException(
-          String.format(
-              "%s supports Integer,"
-                  + " Long, String and byte[] objects directly not for %s type, you must provide a Mapping use via.",
-              this.getClass().getCanonicalName(), type.toString()));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static void populateDisplayData(DisplayData.Builder builder, Integer precision) {
-    builder.add(DisplayData.item("precision", precision).withLabel("Precision"));
-  }
-
-  // HLLCount supports, Long, Integers, String and Byte primitives.
-  // We will return an appropriate builder
-  protected static <T> Builder<T> builderForType(TypeDescriptor<T> input) {
-
-    @SuppressWarnings("rawtypes")
-    HllCount.Init.Builder builder = null;
-
-    if (input.equals(TypeDescriptors.strings())) {
-      builder = HllCount.Init.forStrings();
-    }
-    if (input.equals(TypeDescriptors.longs())) {
-      builder = HllCount.Init.forLongs();
-    }
-    if (input.equals(TypeDescriptors.integers())) {
-      builder = HllCount.Init.forIntegers();
-    }
-    if (input.equals(new TypeDescriptor<byte[]>() {})) {
-      builder = HllCount.Init.forBytes();
-    }
-
-    if (builder == null) {
-      throw new IllegalArgumentException(String.format("Type not supported %s", input));
-    }
-
-    // Safe to ignore warning, as we know the type based on the check we do above.
-    @SuppressWarnings("unchecked")
-    Builder<T> output = (Builder<T>) builder;
-
-    return output;
-  }
-}
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
deleted file mode 100644
index 8796d83..0000000
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.zetasketch.HyperLogLogPlusPlus;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/** Tests for {@link ApproximateCountDistinct}. */
-public class ApproximateCountDistinctTest {
-
-  @Rule public final transient TestPipeline p = TestPipeline.create();
-
-  // Integer
-  private static final List<Integer> INTS1 = Arrays.asList(1, 2, 3, 3, 1, 4);
-  private static final Long INTS1_ESTIMATE;
-
-  private static final int TEST_PRECISION = 20;
-
-  static {
-    HyperLogLogPlusPlus<Integer> hll = new HyperLogLogPlusPlus.Builder().buildForIntegers();
-    INTS1.forEach(hll::add);
-    INTS1_ESTIMATE = hll.longResult();
-  }
-
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testIntegerBuilder() {
-
-    PCollection<Integer> ints = p.apply(Create.of(1));
-    HllCount.Init.Builder<Integer> builder =
-        ApproximateCountDistinct.<Integer>builderForType(
-            ints.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = ints.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testStringBuilder() {
-
-    PCollection<String> strings = p.apply(Create.<String>of("43"));
-    HllCount.Init.Builder<String> builder =
-        ApproximateCountDistinct.<String>builderForType(
-            strings.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = strings.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testLongBuilder() {
-
-    PCollection<Long> longs = p.apply(Create.<Long>of(1L));
-    HllCount.Init.Builder<Long> builder =
-        ApproximateCountDistinct.<Long>builderForType(longs.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = longs.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testBytesBuilder() {
-
-    byte[] byteArray = new byte[] {'A'};
-    PCollection<byte[]> bytes = p.apply(Create.of(byteArray));
-    TypeDescriptor<byte[]> a = bytes.getCoder().getEncodedTypeDescriptor();
-    HllCount.Init.Builder<byte[]> builder =
-        ApproximateCountDistinct.<byte[]>builderForType(
-            bytes.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = bytes.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-
-  /** Test Integer Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForInteger() {
-    PCollection<Long> approxResultInteger =
-        p.apply("Int", Create.of(INTS1)).apply("IntHLL", ApproximateCountDistinct.globally());
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-    p.run();
-  }
-
-  /** Test Long Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForLong() {
-
-    PCollection<Long> approxResultLong =
-        p.apply("Long", Create.of(INTS1.stream().map(Long::valueOf).collect(Collectors.toList())))
-            .apply("LongHLL", ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultLong).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test String Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForStrings() {
-    PCollection<Long> approxResultString =
-        p.apply("Str", Create.of(INTS1.stream().map(String::valueOf).collect(Collectors.toList())))
-            .apply("StrHLL", ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultString).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test Byte Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForBytes() {
-    PCollection<Long> approxResultByte =
-        p.apply(
-                "BytesHLL",
-                Create.of(
-                    INTS1.stream()
-                        .map(x -> ByteBuffer.allocate(4).putInt(x).array())
-                        .collect(Collectors.toList())))
-            .apply(ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultByte).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test Integer Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForInteger() {
-
-    List<KV<Integer, Integer>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test Long Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForLong() {
-
-    List<KV<Integer, Long>> longs = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        longs.add(KV.of(i, (long) k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test String Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForStrings() {
-    List<KV<Integer, String>> strings = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        strings.add(KV.of(i, String.valueOf(k)));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Str", Create.of(strings)).apply("StrHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test Byte Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForBytes() {
-
-    List<KV<Integer, byte[]>> bytes = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        bytes.add(KV.of(i, ByteBuffer.allocate(4).putInt(k).array()));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("BytesHLL", Create.of(bytes)).apply(ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testObjectTypesGlobal() {
-
-    PCollection<Long> approxResultInteger =
-        p.apply(
-                "Int",
-                Create.of(
-                    INTS1.stream().map(x -> KV.of(x, KV.of(x, x))).collect(Collectors.toList())))
-            .apply(
-                "IntHLL",
-                ApproximateCountDistinct.<KV<Integer, KV<Integer, Integer>>>globally()
-                    .via((KV<Integer, KV<Integer, Integer>> x) -> (long) x.getValue().hashCode()));
-
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testObjectTypesPerKey() {
-
-    List<KV<Integer, KV<Integer, Integer>>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, KV.of(i, k)));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> approxResultInteger =
-        p.apply("Int", Create.of(ints))
-            .apply(
-                "IntHLL",
-                ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
-                    .via(x -> KV.of(x.getKey(), (long) x.hashCode()))
-                    .withPercision(TEST_PRECISION));
-
-    PAssert.that(approxResultInteger)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testGlobalPercision() {
-
-    PCollection<Long> approxResultInteger =
-        p.apply("Int", Create.of(INTS1))
-            .apply("IntHLL", ApproximateCountDistinct.globally().withPercision(TEST_PRECISION));
-
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testPerKeyPercision() {
-
-    List<KV<Integer, Integer>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> approxResultInteger =
-        p.apply("Int", Create.of(ints))
-            .apply("IntHLL", ApproximateCountDistinct.perKey().withPercision(TEST_PRECISION));
-
-    PAssert.that(approxResultInteger)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-}