You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ra...@apache.org on 2021/01/05 07:26:15 UTC

[beam] branch revert-12973-hll created (now c7d80a8)

This is an automated email from the ASF dual-hosted git repository.

rarokni pushed a change to branch revert-12973-hll
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at c7d80a8  Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"

This branch includes the following new commits:

     new c7d80a8  Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rarokni pushed a commit to branch revert-12973-hll
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c7d80a855ea7b1897ab24b2d4a1db126c0ddbc7f
Author: Reza Rokni <75...@users.noreply.github.com>
AuthorDate: Tue Jan 5 15:25:31 2021 +0800

    Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl"
---
 .../beam/sdk/transforms/ApproximateUnique.java     |  20 +-
 .../zetasketch/ApproximateCountDistinct.java       | 288 -----------------
 .../zetasketch/ApproximateCountDistinctTest.java   | 342 ---------------------
 3 files changed, 9 insertions(+), 641 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 760883a..c943084 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -43,22 +43,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
  * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
  *
- * @deprecated
- *     <p>Consider using {@code ApproximateCountDistinct} in the {@code zetasketch} extension
- *     module, which makes use of the {@code HllCount} implementation.
- *     <p>If {@code ApproximateCountDistinct} does not meet your needs then you can directly use
- *     {@code HllCount}. Direct usage will also give you access to save intermediate aggregation
- *     result into a sketch for later processing.
- *     <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
- *     <pre>{@code
+ * <p>Consider using {@code HllCount} in the {@code zetasketch} extension module if you need better
+ * performance or need to save intermediate aggregation result into a sketch for later processing.
+ *
+ * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}:
+ *
+ * <pre>{@code
  * PCollection<String> input = ...;
  * PCollection<Long> countDistinct =
  *     input.apply(HllCount.Init.forStrings().globally()).apply(HllCount.Extract.globally());
  * }</pre>
- *     For more details about using {@code HllCount} and the {@code zetasketch} extension module,
- *     see https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
+ *
+ * For more details about using {@code HllCount} and the {@code zetasketch} extension module, see
+ * https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7.
  */
-@Deprecated
 public class ApproximateUnique {
 
   /**
diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
deleted file mode 100644
index 9b9daf5..0000000
--- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.auto.value.AutoValue;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.zetasketch.HllCount.Init.Builder;
-import org.apache.beam.sdk.transforms.Contextful;
-import org.apache.beam.sdk.transforms.Contextful.Fn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ProcessFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or
- * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
- *
- * <p>We make use of the {@link HllCount} implementation for this transform. Please use {@link
- * HllCount} directly if you need access to the sketches.
- *
- * <p>If the object is not one of {@link Byte[]} {@link Integer} {@link Double} {@link String} make
- * use of {@link Globally#via} or {@link PerKey#via}
- *
- * <h3>Examples</h3>
- *
- * <h4>Example 1: Approximate Count of Ints {@code PCollection<Integer>} and specify precision</h4>
- *
- * <pre>{@code
- * p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.globally()
- *   .withPercision(PRECISION));
- *
- * }</pre>
- *
- * <h4>Example 2: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Long>> result =
- *   p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
- *
- * }</pre>
- *
- * <h4>Example 3: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4>
- *
- * <pre>{@code
- * PCollection<KV<Integer, Foo>> approxResultInteger =
- *   p.apply("Int", Create.of(Foo))
- *     .apply("IntHLL", ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
- *       .via(kv -> KV.of(kv.getKey(), (long) kv.getValue().hashCode())));
- * }</pre>
- */
-@Experimental
-public class ApproximateCountDistinct {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ApproximateCountDistinct.class);
-
-  private static final List<TypeDescriptor<?>> HLL_IMPLEMENTED_TYPES =
-      ImmutableList.of(
-          TypeDescriptors.strings(),
-          TypeDescriptors.longs(),
-          TypeDescriptors.integers(),
-          new TypeDescriptor<byte[]>() {});
-
-  public static <T> Globally<T> globally() {
-    return new AutoValue_ApproximateCountDistinct_Globally.Builder<T>()
-        .setPrecision(HllCount.DEFAULT_PRECISION)
-        .build();
-  }
-
-  public static <K, V> PerKey<K, V> perKey() {
-    return new AutoValue_ApproximateCountDistinct_PerKey.Builder<K, V>()
-        .setPrecision(HllCount.DEFAULT_PRECISION)
-        .build();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * {@code PTransform} for estimating the number of distinct elements in a {@code PCollection}.
-   *
-   * @param <T> the type of the elements in the input {@code PCollection}
-   */
-  @AutoValue
-  public abstract static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> {
-
-    public abstract int getPrecision();
-
-    public abstract Builder<T> toBuilder();
-
-    @Nullable
-    public abstract Contextful<Fn<T, Long>> getMapping();
-
-    @AutoValue.Builder
-    public abstract static class Builder<T> {
-
-      public abstract Builder<T> setPrecision(int precision);
-
-      public abstract Builder<T> setMapping(Contextful<Fn<T, Long>> value);
-
-      public abstract Globally<T> build();
-    }
-
-    public Globally<T> via(ProcessFunction<T, Long> fn) {
-
-      return toBuilder().setMapping(Contextful.<T, Long>fn(fn)).build();
-    }
-
-    public <V> Globally<V> withPercision(Integer withPercision) {
-      @SuppressWarnings("unchecked")
-      Globally<V> globally = (Globally<V>) toBuilder().setPrecision(withPercision).build();
-      return globally;
-    }
-
-    @Override
-    public PCollection<Long> expand(PCollection<T> input) {
-
-      TypeDescriptor<T> type = input.getCoder().getEncodedTypeDescriptor();
-
-      if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
-        HllCount.Init.Builder<T> builder = builderForType(type);
-
-        return input.apply(builder.globally()).apply(HllCount.Extract.globally());
-      }
-
-      // Boiler plate to avoid  [argument.type.incompatible] NonNull vs Nullable
-      Contextful<Fn<T, Long>> mapping = getMapping();
-
-      if (mapping != null) {
-        return input
-            .apply(MapElements.into(TypeDescriptors.longs()).via(mapping))
-            .apply(HllCount.Init.forLongs().globally())
-            .apply(HllCount.Extract.globally());
-      }
-
-      throw new IllegalArgumentException(
-          String.format(
-              "%s supports Integer,"
-                  + " Long, String and byte[] objects directly. For other types you must provide a Mapping function.",
-              this.getClass().getCanonicalName()));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
-    }
-  }
-
-  @AutoValue
-  public abstract static class PerKey<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
-
-    public abstract Integer getPrecision();
-
-    @Nullable
-    public abstract Contextful<Fn<KV<K, V>, KV<K, Long>>> getMapping();
-
-    public abstract Builder<K, V> toBuilder();
-
-    @AutoValue.Builder
-    public abstract static class Builder<K, V> {
-
-      public abstract Builder<K, V> setPrecision(Integer precision);
-
-      public abstract Builder<K, V> setMapping(Contextful<Fn<KV<K, V>, KV<K, Long>>> value);
-
-      public abstract PerKey<K, V> build();
-    }
-
-    public <K2, V2> PerKey<K2, V2> withPercision(Integer withPercision) {
-      // Work around for loss of type inference when using API.
-      @SuppressWarnings("unchecked")
-      PerKey<K2, V2> perKey = (PerKey<K2, V2>) this.toBuilder().setPrecision(withPercision).build();
-      return perKey;
-    }
-
-    public PerKey<K, V> via(ProcessFunction<KV<K, V>, KV<K, Long>> fn) {
-
-      return this.toBuilder().setMapping(Contextful.<KV<K, V>, KV<K, Long>>fn(fn)).build();
-    }
-
-    @Override
-    public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) {
-
-      Coder<V> coder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
-
-      TypeDescriptor<V> type = coder.getEncodedTypeDescriptor();
-
-      if (HLL_IMPLEMENTED_TYPES.contains(type)) {
-
-        HllCount.Init.Builder<V> builder = builderForType(type);
-
-        return input.apply(builder.perKey()).apply(HllCount.Extract.perKey());
-      }
-
-      // Boiler plate to avoid  [argument.type.incompatible] NonNull vs Nullable
-      Contextful<Fn<KV<K, V>, KV<K, Long>>> mapping = getMapping();
-
-      if (mapping != null) {
-        Coder<K> keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
-        return input
-            .apply(
-                MapElements.into(
-                        TypeDescriptors.kvs(
-                            keyCoder.getEncodedTypeDescriptor(), TypeDescriptors.longs()))
-                    .via(mapping))
-            .apply(HllCount.Init.forLongs().perKey())
-            .apply(HllCount.Extract.perKey());
-      }
-
-      throw new IllegalArgumentException(
-          String.format(
-              "%s supports Integer,"
-                  + " Long, String and byte[] objects directly not for %s type, you must provide a Mapping use via.",
-              this.getClass().getCanonicalName(), type.toString()));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      ApproximateCountDistinct.populateDisplayData(builder, getPrecision());
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static void populateDisplayData(DisplayData.Builder builder, Integer precision) {
-    builder.add(DisplayData.item("precision", precision).withLabel("Precision"));
-  }
-
-  // HLLCount supports, Long, Integers, String and Byte primitives.
-  // We will return an appropriate builder
-  protected static <T> Builder<T> builderForType(TypeDescriptor<T> input) {
-
-    @SuppressWarnings("rawtypes")
-    HllCount.Init.Builder builder = null;
-
-    if (input.equals(TypeDescriptors.strings())) {
-      builder = HllCount.Init.forStrings();
-    }
-    if (input.equals(TypeDescriptors.longs())) {
-      builder = HllCount.Init.forLongs();
-    }
-    if (input.equals(TypeDescriptors.integers())) {
-      builder = HllCount.Init.forIntegers();
-    }
-    if (input.equals(new TypeDescriptor<byte[]>() {})) {
-      builder = HllCount.Init.forBytes();
-    }
-
-    if (builder == null) {
-      throw new IllegalArgumentException(String.format("Type not supported %s", input));
-    }
-
-    // Safe to ignore warning, as we know the type based on the check we do above.
-    @SuppressWarnings("unchecked")
-    Builder<T> output = (Builder<T>) builder;
-
-    return output;
-  }
-}
diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
deleted file mode 100644
index 8796d83..0000000
--- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.zetasketch;
-
-import com.google.zetasketch.HyperLogLogPlusPlus;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/** Tests for {@link ApproximateCountDistinct}. */
-public class ApproximateCountDistinctTest {
-
-  @Rule public final transient TestPipeline p = TestPipeline.create();
-
-  // Integer
-  private static final List<Integer> INTS1 = Arrays.asList(1, 2, 3, 3, 1, 4);
-  private static final Long INTS1_ESTIMATE;
-
-  private static final int TEST_PRECISION = 20;
-
-  static {
-    HyperLogLogPlusPlus<Integer> hll = new HyperLogLogPlusPlus.Builder().buildForIntegers();
-    INTS1.forEach(hll::add);
-    INTS1_ESTIMATE = hll.longResult();
-  }
-
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testIntegerBuilder() {
-
-    PCollection<Integer> ints = p.apply(Create.of(1));
-    HllCount.Init.Builder<Integer> builder =
-        ApproximateCountDistinct.<Integer>builderForType(
-            ints.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = ints.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testStringBuilder() {
-
-    PCollection<String> strings = p.apply(Create.<String>of("43"));
-    HllCount.Init.Builder<String> builder =
-        ApproximateCountDistinct.<String>builderForType(
-            strings.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = strings.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testLongBuilder() {
-
-    PCollection<Long> longs = p.apply(Create.<Long>of(1L));
-    HllCount.Init.Builder<Long> builder =
-        ApproximateCountDistinct.<Long>builderForType(longs.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = longs.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-  /** Test correct Builder is returned from Generic type. * */
-  @Test
-  public void testBytesBuilder() {
-
-    byte[] byteArray = new byte[] {'A'};
-    PCollection<byte[]> bytes = p.apply(Create.of(byteArray));
-    TypeDescriptor<byte[]> a = bytes.getCoder().getEncodedTypeDescriptor();
-    HllCount.Init.Builder<byte[]> builder =
-        ApproximateCountDistinct.<byte[]>builderForType(
-            bytes.getCoder().getEncodedTypeDescriptor());
-    PCollection<Long> result = bytes.apply(builder.globally()).apply(HllCount.Extract.globally());
-    PAssert.that(result).containsInAnyOrder(1L);
-    p.run();
-  }
-
-  /** Test Integer Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForInteger() {
-    PCollection<Long> approxResultInteger =
-        p.apply("Int", Create.of(INTS1)).apply("IntHLL", ApproximateCountDistinct.globally());
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-    p.run();
-  }
-
-  /** Test Long Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForLong() {
-
-    PCollection<Long> approxResultLong =
-        p.apply("Long", Create.of(INTS1.stream().map(Long::valueOf).collect(Collectors.toList())))
-            .apply("LongHLL", ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultLong).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test String Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForStrings() {
-    PCollection<Long> approxResultString =
-        p.apply("Str", Create.of(INTS1.stream().map(String::valueOf).collect(Collectors.toList())))
-            .apply("StrHLL", ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultString).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test Byte Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesGlobalForBytes() {
-    PCollection<Long> approxResultByte =
-        p.apply(
-                "BytesHLL",
-                Create.of(
-                    INTS1.stream()
-                        .map(x -> ByteBuffer.allocate(4).putInt(x).array())
-                        .collect(Collectors.toList())))
-            .apply(ApproximateCountDistinct.globally());
-
-    PAssert.thatSingleton(approxResultByte).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test Integer Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForInteger() {
-
-    List<KV<Integer, Integer>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test Long Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForLong() {
-
-    List<KV<Integer, Long>> longs = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        longs.add(KV.of(i, (long) k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test String Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForStrings() {
-    List<KV<Integer, String>> strings = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        strings.add(KV.of(i, String.valueOf(k)));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("Str", Create.of(strings)).apply("StrHLL", ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test Byte Globally. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testStandardTypesPerKeyForBytes() {
-
-    List<KV<Integer, byte[]>> bytes = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        bytes.add(KV.of(i, ByteBuffer.allocate(4).putInt(k).array()));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> result =
-        p.apply("BytesHLL", Create.of(bytes)).apply(ApproximateCountDistinct.perKey());
-
-    PAssert.that(result)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testObjectTypesGlobal() {
-
-    PCollection<Long> approxResultInteger =
-        p.apply(
-                "Int",
-                Create.of(
-                    INTS1.stream().map(x -> KV.of(x, KV.of(x, x))).collect(Collectors.toList())))
-            .apply(
-                "IntHLL",
-                ApproximateCountDistinct.<KV<Integer, KV<Integer, Integer>>>globally()
-                    .via((KV<Integer, KV<Integer, Integer>> x) -> (long) x.getValue().hashCode()));
-
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testObjectTypesPerKey() {
-
-    List<KV<Integer, KV<Integer, Integer>>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, KV.of(i, k)));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> approxResultInteger =
-        p.apply("Int", Create.of(ints))
-            .apply(
-                "IntHLL",
-                ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey()
-                    .via(x -> KV.of(x.getKey(), (long) x.hashCode()))
-                    .withPercision(TEST_PRECISION));
-
-    PAssert.that(approxResultInteger)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testGlobalPercision() {
-
-    PCollection<Long> approxResultInteger =
-        p.apply("Int", Create.of(INTS1))
-            .apply("IntHLL", ApproximateCountDistinct.globally().withPercision(TEST_PRECISION));
-
-    PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE);
-
-    p.run();
-  }
-
-  /** Test a general object, we will make use of a KV as the object as it already has a coder. */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testPerKeyPercision() {
-
-    List<KV<Integer, Integer>> ints = new ArrayList<>();
-
-    for (int i = 0; i < 3; i++) {
-      for (int k : INTS1) {
-        ints.add(KV.of(i, k));
-      }
-    }
-
-    PCollection<KV<Integer, Long>> approxResultInteger =
-        p.apply("Int", Create.of(ints))
-            .apply("IntHLL", ApproximateCountDistinct.perKey().withPercision(TEST_PRECISION));
-
-    PAssert.that(approxResultInteger)
-        .containsInAnyOrder(
-            ImmutableList.of(
-                KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE)));
-
-    p.run();
-  }
-}