You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/12/01 02:12:44 UTC

[accumulo] branch 1.8 updated (ed313f7 -> 06cb5ed)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a change to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from ed313f7  ACCUMULO-4744 Fixed RFile API scanner bug (#324)
     add c28e11c  ACCUMULO-4669 Use windowed statistics in RFile
     new 06cb5ed  Merge branch '1.7' into 1.8

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/accumulo/core/file/rfile/RFile.java |   9 +-
 .../accumulo/core/file/rfile/RollingStats.java     | 114 ++++++++++++
 .../core/file/rfile/RolllingStatsTest.java         | 205 +++++++++++++++++++++
 3 files changed, 324 insertions(+), 4 deletions(-)
 create mode 100644 core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
 create mode 100644 core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].

[accumulo] 01/01: Merge branch '1.7' into 1.8

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 06cb5ed4299d386612fd214ad0a1ab75c156d685
Merge: ed313f7 c28e11c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Nov 30 20:49:01 2017 -0500

    Merge branch '1.7' into 1.8

 .../org/apache/accumulo/core/file/rfile/RFile.java |   9 +-
 .../accumulo/core/file/rfile/RollingStats.java     | 114 ++++++++++++
 .../core/file/rfile/RolllingStatsTest.java         | 205 +++++++++++++++++++++
 3 files changed, 324 insertions(+), 4 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 4539392,fe1c832..cda246a
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@@ -327,103 -282,108 +326,105 @@@ public class RFile 
  
    }
  
 -  public static class Writer implements FileSKVWriter {
 -
 -    public static final int MAX_CF_IN_DLG = 1000;
 -    private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 -
 -    private BlockFileWriter fileWriter;
 -    private ABlockWriter blockWriter;
 +  private static class SampleEntry {
 +    Key key;
 +    Value val;
  
 -    // private BlockAppender blockAppender;
 -    private final long blockSize;
 -    private final long maxBlockSize;
 -    private final int indexBlockSize;
 -    private int entries = 0;
 -
 -    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
 -    private LocalityGroupMetadata currentLocalityGroup = null;
 -    private int nextBlock = 0;
 +    SampleEntry(Key key, Value val) {
 +      this.key = new Key(key);
 +      this.val = new Value(val);
 +    }
 +  }
  
 -    private Key lastKeyInBlock = null;
 +  private static class SampleLocalityGroupWriter {
  
 -    private boolean dataClosed = false;
 -    private boolean closed = false;
 -    private Key prevKey = new Key();
 -    private boolean startedDefaultLocalityGroup = false;
 +    private Sampler sampler;
  
 -    private HashSet<ByteSequence> previousColumnFamilies;
 +    private List<SampleEntry> entries = new ArrayList<>();
 +    private long dataSize = 0;
  
 -    // Use windowed stats to fix ACCUMULO-4669
 -    private RollingStats keyLenStats = new RollingStats(2017);
 -    private double avergageKeySize = 0;
 +    private LocalityGroupWriter lgr;
  
 -    public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
 -      this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
 +    public SampleLocalityGroupWriter(LocalityGroupWriter lgr, Sampler sampler) {
 +      this.lgr = lgr;
 +      this.sampler = sampler;
      }
  
 -    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
 -      this.blockSize = blockSize;
 -      this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
 -      this.indexBlockSize = indexBlockSize;
 -      this.fileWriter = bfw;
 -      this.blockWriter = null;
 -      previousColumnFamilies = new HashSet<>();
 +    public void append(Key key, Value value) throws IOException {
 +      if (sampler.accept(key)) {
 +        entries.add(new SampleEntry(key, value));
 +        dataSize += key.getSize() + value.getSize();
 +      }
      }
  
 -    @Override
 -    public synchronized void close() throws IOException {
 -
 -      if (closed) {
 -        return;
 +    public void close() throws IOException {
 +      for (SampleEntry se : entries) {
 +        lgr.append(se.key, se.val);
        }
  
 -      closeData();
 +      lgr.close();
 +    }
  
 -      ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
 +    public void flushIfNeeded() throws IOException {
 +      if (dataSize > sampleBufferSize) {
 +        // the reason to write out all but one key is so that closeBlock() can always eventually be called with true
 +        List<SampleEntry> subList = entries.subList(0, entries.size() - 1);
  
 -      mba.writeInt(RINDEX_MAGIC);
 -      mba.writeInt(RINDEX_VER_7);
 -
 -      if (currentLocalityGroup != null)
 -        localityGroups.add(currentLocalityGroup);
 +        if (subList.size() > 0) {
 +          for (SampleEntry se : subList) {
 +            lgr.append(se.key, se.val);
 +          }
  
 -      mba.writeInt(localityGroups.size());
 +          lgr.closeBlock(subList.get(subList.size() - 1).key, false);
  
 -      for (LocalityGroupMetadata lc : localityGroups) {
 -        lc.write(mba);
 +          subList.clear();
 +          dataSize = 0;
 +        }
        }
 +    }
 +  }
  
 -      mba.close();
 +  private static class LocalityGroupWriter {
  
 -      fileWriter.close();
 +    private BlockFileWriter fileWriter;
 +    private ABlockWriter blockWriter;
  
 -      closed = true;
 -    }
 +    // private BlockAppender blockAppender;
 +    private final long blockSize;
 +    private final long maxBlockSize;
 +    private int entries = 0;
  
 -    private void closeData() throws IOException {
 +    private LocalityGroupMetadata currentLocalityGroup = null;
  
 -      if (dataClosed) {
 -        return;
 -      }
 +    private Key lastKeyInBlock = null;
  
 -      dataClosed = true;
 +    private Key prevKey = new Key();
  
 -      if (blockWriter != null) {
 -        closeBlock(lastKeyInBlock, true);
 -      }
 +    private SampleLocalityGroupWriter sample;
 +
-     private SummaryStatistics keyLenStats = new SummaryStatistics();
++    // Use windowed stats to fix ACCUMULO-4669
++    private RollingStats keyLenStats = new RollingStats(2017);
 +    private double avergageKeySize = 0;
 +
 +    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
 +        SampleLocalityGroupWriter sample) {
 +      this.fileWriter = fileWriter;
 +      this.blockSize = blockSize;
 +      this.maxBlockSize = maxBlockSize;
 +      this.currentLocalityGroup = currentLocalityGroup;
 +      this.sample = sample;
      }
  
      private boolean isGiantKey(Key k) {
-       // consider a key thats more than 3 standard deviations from previously seen key sizes as giant
-       return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation() * 3;
+       double mean = keyLenStats.getMean();
+       double stddev = keyLenStats.getStandardDeviation();
+       return k.getSize() > mean + Math.max(9 * mean, 4 * stddev);
      }
  
 -    @Override
      public void append(Key key, Value value) throws IOException {
  
 -      if (dataClosed) {
 -        throw new IllegalStateException("Cannont append, data closed");
 -      }
 -
        if (key.compareTo(prevKey) < 0) {
 -        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
 +        throw new IllegalArgumentException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
        }
  
        currentLocalityGroup.updateColumnCount(key);
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
index 0000000,d223574..c0c5554
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
@@@ -1,0 -1,113 +1,114 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ package org.apache.accumulo.core.file.rfile;
+ 
 -import org.apache.commons.math.stat.StatUtils;
++import org.apache.commons.math3.stat.StatUtils;
++import org.apache.commons.math3.util.FastMath;
+ 
+ /**
+  * This class supports efficient window statistics. Apache commons math3 has a class called DescriptiveStatistics that supports windows. DescriptiveStatistics
+  * recomputes the statistics over the entire window each time its requested. In a test over 1,000,000 entries with a window size of 1019 that requested stats
+  * for each entry this class took ~50ms and DescriptiveStatistics took ~6,000ms.
+  *
+  * <p>
+  * This class may not be as accurate as DescriptiveStatistics. In unit test its within 1/1000 of DescriptiveStatistics.
+  */
+ class RollingStats {
+   private int position;
+   private double window[];
+ 
+   private double average;
+   private double variance;
+   private double stddev;
+ 
+   // indicates if the window is full
+   private boolean windowFull;
+ 
+   private int recomputeCounter = 0;
+ 
+   RollingStats(int windowSize) {
+     this.windowFull = false;
+     this.position = 0;
+     this.window = new double[windowSize];
+   }
+ 
+   /**
+    * @see <a href= "http://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/">Efficient and accurate rolling standard deviation</a>
+    */
+   private void update(double newValue, double oldValue, int windowSize) {
+     double delta = newValue - oldValue;
+ 
+     double oldAverage = average;
+     average = average + delta / windowSize;
+     variance += delta * (newValue - average + oldValue - oldAverage) / (windowSize - 1);
 -    stddev = Math.sqrt(variance);
++    stddev = FastMath.sqrt(variance);
+   }
+ 
+   void addValue(long stat) {
+ 
+     double old = window[position];
+     window[position] = stat;
+     position++;
+     recomputeCounter++;
+ 
+     if (windowFull) {
+       update(stat, old, window.length);
+     } else if (position == window.length) {
+       computeStats(window.length);
+       windowFull = true;
+     }
+ 
+     if (position == window.length) {
+       position = 0;
+     }
+   }
+ 
+   private void computeStats(int len) {
+     average = StatUtils.mean(window, 0, len);
+     variance = StatUtils.variance(window, average, 0, len);
 -    stddev = Math.sqrt(variance);
++    stddev = FastMath.sqrt(variance);
+     recomputeCounter = 0;
+   }
+ 
+   private void computeStats() {
+     if (windowFull) {
+       if (variance < 0 || recomputeCounter >= 100) {
+         // incremental computation drifts over time, so periodically force a recompute
+         computeStats(window.length);
+       }
+     } else if (recomputeCounter > 0) {
+       computeStats(position);
+     }
+   }
+ 
+   double getMean() {
+     computeStats();
+     return average;
+   }
+ 
+   double getVariance() {
+     computeStats();
+     return variance;
+   }
+ 
+   double getStandardDeviation() {
+     computeStats();
+     return stddev;
+   }
+ 
+   boolean isWindowFull() {
+     return windowFull;
+   }
+ }
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
index 0000000,28e1b20..4f8fcd1
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
@@@ -1,0 -1,178 +1,205 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.accumulo.core.file.rfile;
+ 
+ import java.util.Random;
+ 
 -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
++import org.apache.commons.math3.distribution.NormalDistribution;
++import org.apache.commons.math3.distribution.ZipfDistribution;
++import org.apache.commons.math3.random.Well19937c;
++import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.google.common.math.DoubleMath;
+ 
+ public class RolllingStatsTest {
+ 
+   private static final double TOLERANCE = 1.0 / 1000;
+ 
+   private static void assertFuzzyEquals(double expected, double actual) {
+     Assert.assertTrue(String.format("expected: %f, actual: %f diff: %f", expected, actual, Math.abs(expected - actual)),
+         DoubleMath.fuzzyEquals(expected, actual, TOLERANCE));
+   }
+ 
+   private static void checkAgreement(DescriptiveStatistics ds, RollingStats rs) {
+     // getting stats from ds is expensive, so do it once... otherwise unit test takes 11 sec
+     // instead of 5 secs
+     double expMean = ds.getMean();
+     double expVar = ds.getVariance();
+     double expStdDev = Math.sqrt(expVar);
+ 
+     assertFuzzyEquals(expMean, rs.getMean());
+     assertFuzzyEquals(expVar, rs.getVariance());
+     assertFuzzyEquals(expStdDev, rs.getStandardDeviation());
+ 
+     Assert.assertTrue(expMean >= 0);
+     Assert.assertTrue(rs.getMean() >= 0);
+     Assert.assertTrue(expVar >= 0);
+     Assert.assertTrue(rs.getVariance() >= 0);
+     Assert.assertTrue(expStdDev >= 0);
+     Assert.assertTrue(rs.getStandardDeviation() >= 0);
+   }
+ 
+   private static class StatTester {
+ 
+     Random rand = new Random(42);
+     private DescriptiveStatistics ds;
+     private RollingStats rs;
+     private RollingStats rsp;
+ 
+     StatTester(int windowSize) {
+       ds = new DescriptiveStatistics();
+       ds.setWindowSize(windowSize);
+ 
+       rs = new RollingStats(windowSize);
+       rsp = new RollingStats(windowSize);
+     }
+ 
+     void addValue(long v) {
+       ds.addValue(v);
+       rs.addValue(v);
+       rsp.addValue(v);
+       checkAgreement(ds, rs);
+ 
+       if (rand.nextDouble() < 0.001) {
+         checkAgreement(ds, rsp);
+       }
+     }
+ 
+     void check() {
+       checkAgreement(ds, rsp);
+     }
+   }
+ 
+   @Test
+   public void testFewSizes() {
+     StatTester st = new StatTester(1019);
+     int[] keySizes = new int[] {103, 113, 123, 2345};
+     Random rand = new Random(42);
+     for (int i = 0; i < 10000; i++) {
+       st.addValue(keySizes[rand.nextInt(keySizes.length)]);
+     }
+     st.check();
+   }
+ 
+   @Test
+   public void testConstant() {
+ 
+     StatTester st = new StatTester(1019);
+ 
+     for (int i = 0; i < 10000; i++) {
+       st.addValue(111);
+     }
+ 
+     st.check();
+   }
+ 
+   @Test
+   public void testUniformIncreasing() {
+ 
+     for (int windowSize : new int[] {10, 13, 20, 100, 500}) {
+ 
+       StatTester st = new StatTester(windowSize);
+ 
+       Random rand = new Random();
+ 
+       for (int i = 0; i < 1000; i++) {
+         int v = 200 + rand.nextInt(50);
+ 
+         st.addValue(v);
+       }
+ 
+       st.check();
+     }
+   }
+ 
+   @Test
+   public void testSlowIncreases() {
+     // number of keys with the same len
+     int len = 100;
+ 
+     StatTester st = new StatTester(1019);
+ 
+     for (int i = 0; i < 50; i++) {
+       for (int j = 0; j < 3000; j++) {
+         st.addValue(len);
+       }
+ 
+       len = (int) (len * 1.1);
+     }
+ 
+     st.check();
+   }
+ 
+   @Test
++  public void testZipf() {
++    ZipfDistribution zd = new ZipfDistribution(new Well19937c(42), 1000, 2);
++    StatTester st = new StatTester(2017);
++
++    for (int i = 0; i < 7000; i++) {
++      st.addValue(zd.sample() * 100);
++    }
++
++    st.check();
++  }
++
++  @Test
++  public void testNormal() {
++    NormalDistribution nd = new NormalDistribution(new Well19937c(42), 200, 20);
++    StatTester st = new StatTester(2017);
++
++    for (int i = 0; i < 7000; i++) {
++      st.addValue((int) nd.sample());
++    }
++
++    st.check();
++  }
++
++  @Test
+   public void testSpikes() {
+ 
+     Random rand = new Random();
+ 
+     StatTester st = new StatTester(3017);
+ 
+     for (int i = 0; i < 13; i++) {
+ 
+       // write small keys
+       int numSmall = 1000 + rand.nextInt(1000);
+       for (int s = 0; s < numSmall; s++) {
+         int sks = 50 + rand.nextInt(100);
+         // simulate row with multiple cols
+         for (int c = 0; c < 3; c++) {
+           st.addValue(sks);
+         }
+       }
+ 
+       // write a few large keys
+       int numLarge = 1 + rand.nextInt(1);
+       for (int l = 0; l < numLarge; l++) {
+         int lks = 500000 + rand.nextInt(1000000);
+         for (int c = 0; c < 3; c++) {
+           st.addValue(lks);
+         }
+       }
+     }
+ 
+     st.check();
+   }
+ }

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.