You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/03/06 22:50:12 UTC

[incubator-druid] branch master updated: Densify swapped hll buffer (#6865)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed2507  Densify swapped hll buffer (#6865)
3ed2507 is described below

commit 3ed250787d06b74a2ac36d6a81df7cc2bb08eea0
Author: Charles Allen <ch...@allen-net.com>
AuthorDate: Wed Mar 6 14:50:04 2019 -0800

    Densify swapped hll buffer (#6865)
    
    * Densify swapped hll buffer
    
    * Make test loop limit pre-increment
    
    * Reformat
    
    * Fix test comments
---
 .../org/apache/druid/hll/HyperLogLogCollector.java |   7 ++
 .../apache/druid/hll/HyperLogLogCollectorTest.java | 113 +++++++++++++++++----
 2 files changed, 103 insertions(+), 17 deletions(-)

diff --git a/hll/src/main/java/org/apache/druid/hll/HyperLogLogCollector.java b/hll/src/main/java/org/apache/druid/hll/HyperLogLogCollector.java
index 5fd7df7..8291214 100644
--- a/hll/src/main/java/org/apache/druid/hll/HyperLogLogCollector.java
+++ b/hll/src/main/java/org/apache/druid/hll/HyperLogLogCollector.java
@@ -387,6 +387,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
 
       storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer());
 
+      if (other.storageBuffer.remaining() != other.getNumBytesForDenseStorage()) {
+        // The other buffer was sparse, densify it
+        final int newLImit = storageBuffer.position() + other.storageBuffer.remaining();
+        storageBuffer.limit(newLImit);
+        convertToDenseStorage();
+      }
+
       other = HyperLogLogCollector.makeCollector(tmpBuffer);
     }
 
diff --git a/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java b/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java
index ffcbd988..3727717 100644
--- a/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java
+++ b/hll/src/test/java/org/apache/druid/hll/HyperLogLogCollectorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.hll;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -30,14 +31,17 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.security.MessageDigest;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Predicate;
 
 /**
+ *
  */
 public class HyperLogLogCollectorTest
 {
@@ -45,6 +49,18 @@ public class HyperLogLogCollectorTest
 
   private final HashFunction fn = Hashing.murmur3_128();
 
+  private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
+  {
+    byte offset = startOffset;
+    while (offset <= endOffset) {
+      // fill buckets to shift registerOffset
+      for (short bucket = 0; bucket < 2048; ++bucket) {
+        collector.add(bucket, offset);
+      }
+      offset++;
+    }
+  }
+
   @Test
   public void testFolding()
   {
@@ -78,14 +94,13 @@ public class HyperLogLogCollectorTest
     }
   }
 
-
   /**
    * This is a very long running test, disabled by default.
    * It is meant to catch issues when combining a large numer of HLL objects.
    *
    * It compares adding all the values to one HLL vs.
    * splitting up values into HLLs of 100 values each, and folding those HLLs into a single main HLL.
-   *
+   * 
    * When reaching very large cardinalities (>> 50,000,000), offsets are mismatched between the main HLL and the ones
    * with 100 values, requiring  a floating max as described in
    * http://druid.io/blog/2014/02/18/hyperloglog-optimizations-for-real-world-systems.html
@@ -502,7 +517,8 @@ public class HyperLogLogCollectorTest
     return retVal;
   }
 
-  @Ignore @Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
+  @Ignore
+  @Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
   public void testFoldingwithDifferentOffsets()
   {
     // final Random random = new Random(37); // this seed will cause this test to fail because of slightly larger errors
@@ -533,7 +549,8 @@ public class HyperLogLogCollectorTest
     }
   }
 
-  @Ignore @Test
+  @Ignore
+  @Test
   public void testFoldingwithDifferentOffsets2() throws Exception
   {
     final Random random = new Random(0);
@@ -708,6 +725,81 @@ public class HyperLogLogCollectorTest
   }
 
   @Test
+  public void testRegisterSwapWithSparse()
+  {
+    final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
+    // Skip the first bucket
+    for (int i = 1; i < HyperLogLogCollector.NUM_BUCKETS; i++) {
+      collector.add((short) i, (byte) 1);
+      Assert.assertEquals(i, collector.getNumNonZeroRegisters());
+      Assert.assertEquals(0, collector.getRegisterOffset());
+    }
+    Assert.assertEquals(
+        15615.219683654448D,
+        HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
+                            .estimateCardinality(),
+        1e-5D
+    );
+
+    final byte[] hash = new byte[10];
+    hash[0] = 1; // Bucket 0, 1 offset of 0
+    collector.add(hash);
+    Assert.assertEquals(0, collector.getNumNonZeroRegisters());
+    Assert.assertEquals(1, collector.getRegisterOffset());
+
+    // We have a REALLY bad distribution, Sketch as 0 is fine.
+    Assert.assertEquals(
+        0.0D,
+        HyperLogLogCollector.makeCollector(collector.toByteBuffer().asReadOnlyBuffer())
+                            .estimateCardinality(),
+        1e-5D
+    );
+    final ByteBuffer buffer = collector.toByteBuffer();
+    Assert.assertEquals(collector.getNumHeaderBytes(), buffer.remaining());
+
+    final HyperLogLogCollector denseCollector = HyperLogLogCollector.makeLatestCollector();
+    for (int i = 0; i < HyperLogLogCollector.NUM_BUCKETS - 1; i++) {
+      denseCollector.add((short) i, (byte) 1);
+    }
+
+    Assert.assertEquals(HyperLogLogCollector.NUM_BUCKETS - 1, denseCollector.getNumNonZeroRegisters());
+    final HyperLogLogCollector folded = denseCollector.fold(HyperLogLogCollector.makeCollector(buffer));
+    Assert.assertNotNull(folded.toByteBuffer());
+    Assert.assertEquals(folded.getStorageBuffer().remaining(), denseCollector.getNumBytesForDenseStorage());
+  }
+
+  // Example of a terrible sampling filter. Don't use this method
+  @Test
+  public void testCanFillUpOnMod()
+  {
+    final HashFunction fn = Hashing.murmur3_128();
+    final HyperLogLogCollector hyperLogLogCollector = HyperLogLogCollector.makeLatestCollector();
+    final byte[] b = new byte[10];
+    b[0] = 1;
+    hyperLogLogCollector.add(b);
+    final Random random = new Random(347893248701078L);
+    long loops = 0;
+    // Do a 1% "sample" where the mod of the hash is 43
+    final Predicate<Integer> pass = i -> {
+      // ByteOrder.nativeOrder() on lots of systems is ByteOrder.LITTLE_ENDIAN
+      final ByteBuffer bb = ByteBuffer.wrap(fn.hashInt(i).asBytes()).order(ByteOrder.LITTLE_ENDIAN);
+      return (bb.getInt() % 100) == 43;
+    };
+    final long loopLimit = 1_000_000_000L;
+    do {
+      final int rnd = random.nextInt();
+      if (!pass.test(rnd)) {
+        continue;
+      }
+      final Hasher hasher = fn.newHasher();
+      hasher.putInt(rnd);
+      hyperLogLogCollector.add(hasher.hash().asBytes());
+    } while (hyperLogLogCollector.getNumNonZeroRegisters() > 0 && ++loops < loopLimit);
+    Assert.assertNotEquals(loopLimit, loops);
+    Assert.assertEquals(hyperLogLogCollector.getNumHeaderBytes(), hyperLogLogCollector.toByteBuffer().remaining());
+  }
+
+  @Test
   public void testMergeMaxOverflow()
   {
     // no offset
@@ -736,19 +828,6 @@ public class HyperLogLogCollectorTest
     Assert.assertEquals(67, collector.getMaxOverflowValue());
   }
 
-
-  private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
-  {
-    byte offset = startOffset;
-    while (offset <= endOffset) {
-      // fill buckets to shift registerOffset
-      for (short bucket = 0; bucket < 2048; ++bucket) {
-        collector.add(bucket, offset);
-      }
-      offset++;
-    }
-  }
-
   @Test
   public void testFoldOrder()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org