You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2017/12/01 02:15:42 UTC

[GitHub] keith-turner closed pull request #293: ACCUMULO-4669 Use windowed statistics in RFile

keith-turner closed pull request #293: ACCUMULO-4669 Use windowed statistics in RFile
URL: https://github.com/apache/accumulo/pull/293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c1931daed3..b0409aff00 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -73,7 +73,6 @@
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -403,7 +402,8 @@ public void flushIfNeeded() throws IOException {
 
     private SampleLocalityGroupWriter sample;
 
-    private SummaryStatistics keyLenStats = new SummaryStatistics();
+    // Use windowed stats to fix ACCUMULO-4669
+    private RollingStats keyLenStats = new RollingStats(2017);
     private double avergageKeySize = 0;
 
     LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
@@ -416,8 +416,9 @@ public void flushIfNeeded() throws IOException {
     }
 
     private boolean isGiantKey(Key k) {
-      // consider a key thats more than 3 standard deviations from previously seen key sizes as giant
-      return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation() * 3;
+      double mean = keyLenStats.getMean();
+      double stddev = keyLenStats.getStandardDeviation();
+      return k.getSize() > mean + Math.max(9 * mean, 4 * stddev);
     }
 
     public void append(Key key, Value value) throws IOException {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
new file mode 100644
index 0000000000..c0c5554889
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.commons.math3.stat.StatUtils;
+import org.apache.commons.math3.util.FastMath;
+
+/**
+ * This class supports efficient window statistics. Apache commons math3 has a class called DescriptiveStatistics that supports windows. DescriptiveStatistics
+ * recomputes the statistics over the entire window each time its requested. In a test over 1,000,000 entries with a window size of 1019 that requested stats
+ * for each entry this class took ~50ms and DescriptiveStatistics took ~6,000ms.
+ *
+ * <p>
+ * This class may not be as accurate as DescriptiveStatistics. In unit test its within 1/1000 of DescriptiveStatistics.
+ */
+class RollingStats {
+  private int position;
+  private double window[];
+
+  private double average;
+  private double variance;
+  private double stddev;
+
+  // indicates if the window is full
+  private boolean windowFull;
+
+  private int recomputeCounter = 0;
+
+  RollingStats(int windowSize) {
+    this.windowFull = false;
+    this.position = 0;
+    this.window = new double[windowSize];
+  }
+
+  /**
+   * @see <a href= "http://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/">Efficient and accurate rolling standard deviation</a>
+   */
+  private void update(double newValue, double oldValue, int windowSize) {
+    double delta = newValue - oldValue;
+
+    double oldAverage = average;
+    average = average + delta / windowSize;
+    variance += delta * (newValue - average + oldValue - oldAverage) / (windowSize - 1);
+    stddev = FastMath.sqrt(variance);
+  }
+
+  void addValue(long stat) {
+
+    double old = window[position];
+    window[position] = stat;
+    position++;
+    recomputeCounter++;
+
+    if (windowFull) {
+      update(stat, old, window.length);
+    } else if (position == window.length) {
+      computeStats(window.length);
+      windowFull = true;
+    }
+
+    if (position == window.length) {
+      position = 0;
+    }
+  }
+
+  private void computeStats(int len) {
+    average = StatUtils.mean(window, 0, len);
+    variance = StatUtils.variance(window, average, 0, len);
+    stddev = FastMath.sqrt(variance);
+    recomputeCounter = 0;
+  }
+
+  private void computeStats() {
+    if (windowFull) {
+      if (variance < 0 || recomputeCounter >= 100) {
+        // incremental computation drifts over time, so periodically force a recompute
+        computeStats(window.length);
+      }
+    } else if (recomputeCounter > 0) {
+      computeStats(position);
+    }
+  }
+
+  double getMean() {
+    computeStats();
+    return average;
+  }
+
+  double getVariance() {
+    computeStats();
+    return variance;
+  }
+
+  double getStandardDeviation() {
+    computeStats();
+    return stddev;
+  }
+
+  boolean isWindowFull() {
+    return windowFull;
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
new file mode 100644
index 0000000000..19f9c5cde6
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.Random;
+import java.util.function.IntSupplier;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.commons.math3.random.Well19937c;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.math.DoubleMath;
+
+public class RolllingStatsTest {
+
+  private static final double TOLERANCE = 1.0 / 1000;
+
+  private static void assertFuzzyEquals(double expected, double actual) {
+    Assert.assertTrue(String.format("expected: %f, actual: %f diff: %f", expected, actual, Math.abs(expected - actual)),
+        DoubleMath.fuzzyEquals(expected, actual, TOLERANCE));
+  }
+
+  private static void checkAgreement(DescriptiveStatistics ds, RollingStats rs) {
+    // getting stats from ds is expensive, so do it once... otherwise unit test takes 11 sec
+    // instead of 5 secs
+    double expMean = ds.getMean();
+    double expVar = ds.getVariance();
+    double expStdDev = Math.sqrt(expVar);
+
+    assertFuzzyEquals(expMean, rs.getMean());
+    assertFuzzyEquals(expVar, rs.getVariance());
+    assertFuzzyEquals(expStdDev, rs.getStandardDeviation());
+
+    Assert.assertTrue(expMean >= 0);
+    Assert.assertTrue(rs.getMean() >= 0);
+    Assert.assertTrue(expVar >= 0);
+    Assert.assertTrue(rs.getVariance() >= 0);
+    Assert.assertTrue(expStdDev >= 0);
+    Assert.assertTrue(rs.getStandardDeviation() >= 0);
+  }
+
+  private static class StatTester {
+
+    Random rand = new Random(42);
+    private DescriptiveStatistics ds;
+    private RollingStats rs;
+    private RollingStats rsp;
+
+    StatTester(int windowSize) {
+      ds = new DescriptiveStatistics();
+      ds.setWindowSize(windowSize);
+
+      rs = new RollingStats(windowSize);
+      rsp = new RollingStats(windowSize);
+    }
+
+    void addValue(long v) {
+      ds.addValue(v);
+      rs.addValue(v);
+      rsp.addValue(v);
+      checkAgreement(ds, rs);
+
+      if (rand.nextDouble() < 0.001) {
+        checkAgreement(ds, rsp);
+      }
+    }
+
+    void check() {
+      checkAgreement(ds, rsp);
+    }
+  }
+
+  @Test
+  public void testFewSizes() {
+    StatTester st = new StatTester(1019);
+    int[] keySizes = new int[] {103, 113, 123, 2345};
+    Random rand = new Random(42);
+    for (int i = 0; i < 10000; i++) {
+      st.addValue(keySizes[rand.nextInt(keySizes.length)]);
+    }
+    st.check();
+  }
+
+  @Test
+  public void testConstant() {
+
+    StatTester st = new StatTester(1019);
+
+    for (int i = 0; i < 10000; i++) {
+      st.addValue(111);
+    }
+
+    st.check();
+  }
+
+  @Test
+  public void testUniformIncreasing() {
+
+    for (int windowSize : new int[] {10, 13, 20, 100, 500}) {
+
+      StatTester st = new StatTester(windowSize);
+
+      Random rand = new Random();
+
+      for (int i = 0; i < 1000; i++) {
+        int v = 200 + rand.nextInt(50);
+
+        st.addValue(v);
+      }
+
+      st.check();
+    }
+  }
+
+  @Test
+  public void testSlowIncreases() {
+    // number of keys with the same len
+    int len = 100;
+
+    StatTester st = new StatTester(1019);
+
+    for (int i = 0; i < 50; i++) {
+      for (int j = 0; j < 3000; j++) {
+        st.addValue(len);
+      }
+
+      len = (int) (len * 1.1);
+    }
+
+    st.check();
+  }
+
+  private void testDistribrution(IntSupplier d) {
+    StatTester st = new StatTester(2017);
+
+    for (int i = 0; i < 7000; i++) {
+      st.addValue(d.getAsInt());
+    }
+
+    st.check();
+  }
+
+  @Test
+  public void testZipf() {
+    ZipfDistribution zd = new ZipfDistribution(new Well19937c(42), 1000, 2);
+    testDistribrution(() -> zd.sample() * 100);
+  }
+
+  @Test
+  public void testNormal() {
+    NormalDistribution nd = new NormalDistribution(new Well19937c(42), 200, 20);
+    testDistribrution(() -> (int) nd.sample());
+  }
+
+  @Test
+  public void testSpikes() {
+
+    Random rand = new Random();
+
+    StatTester st = new StatTester(3017);
+
+    for (int i = 0; i < 13; i++) {
+
+      // write small keys
+      int numSmall = 1000 + rand.nextInt(1000);
+      for (int s = 0; s < numSmall; s++) {
+        int sks = 50 + rand.nextInt(100);
+        // simulate row with multiple cols
+        for (int c = 0; c < 3; c++) {
+          st.addValue(sks);
+        }
+      }
+
+      // write a few large keys
+      int numLarge = 1 + rand.nextInt(1);
+      for (int l = 0; l < numLarge; l++) {
+        int lks = 500000 + rand.nextInt(1000000);
+        for (int c = 0; c < 3; c++) {
+          st.addValue(lks);
+        }
+      }
+    }
+
+    st.check();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services