You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/12/01 02:12:45 UTC

[accumulo] 01/01: Merge branch '1.7' into 1.8

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 06cb5ed4299d386612fd214ad0a1ab75c156d685
Merge: ed313f7 c28e11c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Nov 30 20:49:01 2017 -0500

    Merge branch '1.7' into 1.8

 .../org/apache/accumulo/core/file/rfile/RFile.java |   9 +-
 .../accumulo/core/file/rfile/RollingStats.java     | 114 ++++++++++++
 .../core/file/rfile/RolllingStatsTest.java         | 205 +++++++++++++++++++++
 3 files changed, 324 insertions(+), 4 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 4539392,fe1c832..cda246a
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@@ -327,103 -282,108 +326,105 @@@ public class RFile 
  
    }
  
 -  public static class Writer implements FileSKVWriter {
 -
 -    public static final int MAX_CF_IN_DLG = 1000;
 -    private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 -
 -    private BlockFileWriter fileWriter;
 -    private ABlockWriter blockWriter;
 +  private static class SampleEntry {
 +    Key key;
 +    Value val;
  
 -    // private BlockAppender blockAppender;
 -    private final long blockSize;
 -    private final long maxBlockSize;
 -    private final int indexBlockSize;
 -    private int entries = 0;
 -
 -    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
 -    private LocalityGroupMetadata currentLocalityGroup = null;
 -    private int nextBlock = 0;
 +    SampleEntry(Key key, Value val) {
 +      this.key = new Key(key);
 +      this.val = new Value(val);
 +    }
 +  }
  
 -    private Key lastKeyInBlock = null;
 +  private static class SampleLocalityGroupWriter {
  
 -    private boolean dataClosed = false;
 -    private boolean closed = false;
 -    private Key prevKey = new Key();
 -    private boolean startedDefaultLocalityGroup = false;
 +    private Sampler sampler;
  
 -    private HashSet<ByteSequence> previousColumnFamilies;
 +    private List<SampleEntry> entries = new ArrayList<>();
 +    private long dataSize = 0;
  
 -    // Use windowed stats to fix ACCUMULO-4669
 -    private RollingStats keyLenStats = new RollingStats(2017);
 -    private double avergageKeySize = 0;
 +    private LocalityGroupWriter lgr;
  
 -    public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
 -      this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
 +    public SampleLocalityGroupWriter(LocalityGroupWriter lgr, Sampler sampler) {
 +      this.lgr = lgr;
 +      this.sampler = sampler;
      }
  
 -    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
 -      this.blockSize = blockSize;
 -      this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
 -      this.indexBlockSize = indexBlockSize;
 -      this.fileWriter = bfw;
 -      this.blockWriter = null;
 -      previousColumnFamilies = new HashSet<>();
 +    public void append(Key key, Value value) throws IOException {
 +      if (sampler.accept(key)) {
 +        entries.add(new SampleEntry(key, value));
 +        dataSize += key.getSize() + value.getSize();
 +      }
      }
  
 -    @Override
 -    public synchronized void close() throws IOException {
 -
 -      if (closed) {
 -        return;
 +    public void close() throws IOException {
 +      for (SampleEntry se : entries) {
 +        lgr.append(se.key, se.val);
        }
  
 -      closeData();
 +      lgr.close();
 +    }
  
 -      ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
 +    public void flushIfNeeded() throws IOException {
 +      if (dataSize > sampleBufferSize) {
 +        // the reason to write out all but one key is so that closeBlock() can always eventually be called with true
 +        List<SampleEntry> subList = entries.subList(0, entries.size() - 1);
  
 -      mba.writeInt(RINDEX_MAGIC);
 -      mba.writeInt(RINDEX_VER_7);
 -
 -      if (currentLocalityGroup != null)
 -        localityGroups.add(currentLocalityGroup);
 +        if (subList.size() > 0) {
 +          for (SampleEntry se : subList) {
 +            lgr.append(se.key, se.val);
 +          }
  
 -      mba.writeInt(localityGroups.size());
 +          lgr.closeBlock(subList.get(subList.size() - 1).key, false);
  
 -      for (LocalityGroupMetadata lc : localityGroups) {
 -        lc.write(mba);
 +          subList.clear();
 +          dataSize = 0;
 +        }
        }
 +    }
 +  }
  
 -      mba.close();
 +  private static class LocalityGroupWriter {
  
 -      fileWriter.close();
 +    private BlockFileWriter fileWriter;
 +    private ABlockWriter blockWriter;
  
 -      closed = true;
 -    }
 +    // private BlockAppender blockAppender;
 +    private final long blockSize;
 +    private final long maxBlockSize;
 +    private int entries = 0;
  
 -    private void closeData() throws IOException {
 +    private LocalityGroupMetadata currentLocalityGroup = null;
  
 -      if (dataClosed) {
 -        return;
 -      }
 +    private Key lastKeyInBlock = null;
  
 -      dataClosed = true;
 +    private Key prevKey = new Key();
  
 -      if (blockWriter != null) {
 -        closeBlock(lastKeyInBlock, true);
 -      }
 +    private SampleLocalityGroupWriter sample;
 +
-     private SummaryStatistics keyLenStats = new SummaryStatistics();
++    // Use windowed stats to fix ACCUMULO-4669
++    private RollingStats keyLenStats = new RollingStats(2017);
 +    private double avergageKeySize = 0;
 +
 +    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
 +        SampleLocalityGroupWriter sample) {
 +      this.fileWriter = fileWriter;
 +      this.blockSize = blockSize;
 +      this.maxBlockSize = maxBlockSize;
 +      this.currentLocalityGroup = currentLocalityGroup;
 +      this.sample = sample;
      }
  
      private boolean isGiantKey(Key k) {
-       // consider a key thats more than 3 standard deviations from previously seen key sizes as giant
-       return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation() * 3;
+       double mean = keyLenStats.getMean();
+       double stddev = keyLenStats.getStandardDeviation();
+       return k.getSize() > mean + Math.max(9 * mean, 4 * stddev);
      }
  
 -    @Override
      public void append(Key key, Value value) throws IOException {
  
 -      if (dataClosed) {
 -        throw new IllegalStateException("Cannont append, data closed");
 -      }
 -
        if (key.compareTo(prevKey) < 0) {
 -        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
 +        throw new IllegalArgumentException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
        }
  
        currentLocalityGroup.updateColumnCount(key);
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
index 0000000,d223574..c0c5554
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
@@@ -1,0 -1,113 +1,114 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ package org.apache.accumulo.core.file.rfile;
+ 
 -import org.apache.commons.math.stat.StatUtils;
++import org.apache.commons.math3.stat.StatUtils;
++import org.apache.commons.math3.util.FastMath;
+ 
+ /**
+  * This class supports efficient window statistics. Apache commons math3 has a class called DescriptiveStatistics that supports windows. DescriptiveStatistics
+  * recomputes the statistics over the entire window each time its requested. In a test over 1,000,000 entries with a window size of 1019 that requested stats
+  * for each entry this class took ~50ms and DescriptiveStatistics took ~6,000ms.
+  *
+  * <p>
+  * This class may not be as accurate as DescriptiveStatistics. In unit test its within 1/1000 of DescriptiveStatistics.
+  */
+ class RollingStats {
+   private int position;
+   private double window[];
+ 
+   private double average;
+   private double variance;
+   private double stddev;
+ 
+   // indicates if the window is full
+   private boolean windowFull;
+ 
+   private int recomputeCounter = 0;
+ 
+   RollingStats(int windowSize) {
+     this.windowFull = false;
+     this.position = 0;
+     this.window = new double[windowSize];
+   }
+ 
+   /**
+    * @see <a href= "http://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/">Efficient and accurate rolling standard deviation</a>
+    */
+   private void update(double newValue, double oldValue, int windowSize) {
+     double delta = newValue - oldValue;
+ 
+     double oldAverage = average;
+     average = average + delta / windowSize;
+     variance += delta * (newValue - average + oldValue - oldAverage) / (windowSize - 1);
 -    stddev = Math.sqrt(variance);
++    stddev = FastMath.sqrt(variance);
+   }
+ 
+   void addValue(long stat) {
+ 
+     double old = window[position];
+     window[position] = stat;
+     position++;
+     recomputeCounter++;
+ 
+     if (windowFull) {
+       update(stat, old, window.length);
+     } else if (position == window.length) {
+       computeStats(window.length);
+       windowFull = true;
+     }
+ 
+     if (position == window.length) {
+       position = 0;
+     }
+   }
+ 
+   private void computeStats(int len) {
+     average = StatUtils.mean(window, 0, len);
+     variance = StatUtils.variance(window, average, 0, len);
 -    stddev = Math.sqrt(variance);
++    stddev = FastMath.sqrt(variance);
+     recomputeCounter = 0;
+   }
+ 
+   private void computeStats() {
+     if (windowFull) {
+       if (variance < 0 || recomputeCounter >= 100) {
+         // incremental computation drifts over time, so periodically force a recompute
+         computeStats(window.length);
+       }
+     } else if (recomputeCounter > 0) {
+       computeStats(position);
+     }
+   }
+ 
+   double getMean() {
+     computeStats();
+     return average;
+   }
+ 
+   double getVariance() {
+     computeStats();
+     return variance;
+   }
+ 
+   double getStandardDeviation() {
+     computeStats();
+     return stddev;
+   }
+ 
+   boolean isWindowFull() {
+     return windowFull;
+   }
+ }
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
index 0000000,28e1b20..4f8fcd1
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
@@@ -1,0 -1,178 +1,205 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.accumulo.core.file.rfile;
+ 
+ import java.util.Random;
+ 
 -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
++import org.apache.commons.math3.distribution.NormalDistribution;
++import org.apache.commons.math3.distribution.ZipfDistribution;
++import org.apache.commons.math3.random.Well19937c;
++import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.google.common.math.DoubleMath;
+ 
+ public class RolllingStatsTest {
+ 
+   private static final double TOLERANCE = 1.0 / 1000;
+ 
+   private static void assertFuzzyEquals(double expected, double actual) {
+     Assert.assertTrue(String.format("expected: %f, actual: %f diff: %f", expected, actual, Math.abs(expected - actual)),
+         DoubleMath.fuzzyEquals(expected, actual, TOLERANCE));
+   }
+ 
+   private static void checkAgreement(DescriptiveStatistics ds, RollingStats rs) {
+     // getting stats from ds is expensive, so do it once... otherwise unit test takes 11 sec
+     // instead of 5 secs
+     double expMean = ds.getMean();
+     double expVar = ds.getVariance();
+     double expStdDev = Math.sqrt(expVar);
+ 
+     assertFuzzyEquals(expMean, rs.getMean());
+     assertFuzzyEquals(expVar, rs.getVariance());
+     assertFuzzyEquals(expStdDev, rs.getStandardDeviation());
+ 
+     Assert.assertTrue(expMean >= 0);
+     Assert.assertTrue(rs.getMean() >= 0);
+     Assert.assertTrue(expVar >= 0);
+     Assert.assertTrue(rs.getVariance() >= 0);
+     Assert.assertTrue(expStdDev >= 0);
+     Assert.assertTrue(rs.getStandardDeviation() >= 0);
+   }
+ 
+   private static class StatTester {
+ 
+     Random rand = new Random(42);
+     private DescriptiveStatistics ds;
+     private RollingStats rs;
+     private RollingStats rsp;
+ 
+     StatTester(int windowSize) {
+       ds = new DescriptiveStatistics();
+       ds.setWindowSize(windowSize);
+ 
+       rs = new RollingStats(windowSize);
+       rsp = new RollingStats(windowSize);
+     }
+ 
+     void addValue(long v) {
+       ds.addValue(v);
+       rs.addValue(v);
+       rsp.addValue(v);
+       checkAgreement(ds, rs);
+ 
+       if (rand.nextDouble() < 0.001) {
+         checkAgreement(ds, rsp);
+       }
+     }
+ 
+     void check() {
+       checkAgreement(ds, rsp);
+     }
+   }
+ 
+   @Test
+   public void testFewSizes() {
+     StatTester st = new StatTester(1019);
+     int[] keySizes = new int[] {103, 113, 123, 2345};
+     Random rand = new Random(42);
+     for (int i = 0; i < 10000; i++) {
+       st.addValue(keySizes[rand.nextInt(keySizes.length)]);
+     }
+     st.check();
+   }
+ 
+   @Test
+   public void testConstant() {
+ 
+     StatTester st = new StatTester(1019);
+ 
+     for (int i = 0; i < 10000; i++) {
+       st.addValue(111);
+     }
+ 
+     st.check();
+   }
+ 
+   @Test
+   public void testUniformIncreasing() {
+ 
+     for (int windowSize : new int[] {10, 13, 20, 100, 500}) {
+ 
+       StatTester st = new StatTester(windowSize);
+ 
+       Random rand = new Random();
+ 
+       for (int i = 0; i < 1000; i++) {
+         int v = 200 + rand.nextInt(50);
+ 
+         st.addValue(v);
+       }
+ 
+       st.check();
+     }
+   }
+ 
+   @Test
+   public void testSlowIncreases() {
+     // number of keys with the same len
+     int len = 100;
+ 
+     StatTester st = new StatTester(1019);
+ 
+     for (int i = 0; i < 50; i++) {
+       for (int j = 0; j < 3000; j++) {
+         st.addValue(len);
+       }
+ 
+       len = (int) (len * 1.1);
+     }
+ 
+     st.check();
+   }
+ 
+   @Test
++  public void testZipf() {
++    ZipfDistribution zd = new ZipfDistribution(new Well19937c(42), 1000, 2);
++    StatTester st = new StatTester(2017);
++
++    for (int i = 0; i < 7000; i++) {
++      st.addValue(zd.sample() * 100);
++    }
++
++    st.check();
++  }
++
++  @Test
++  public void testNormal() {
++    NormalDistribution nd = new NormalDistribution(new Well19937c(42), 200, 20);
++    StatTester st = new StatTester(2017);
++
++    for (int i = 0; i < 7000; i++) {
++      st.addValue((int) nd.sample());
++    }
++
++    st.check();
++  }
++
++  @Test
+   public void testSpikes() {
+ 
+     Random rand = new Random();
+ 
+     StatTester st = new StatTester(3017);
+ 
+     for (int i = 0; i < 13; i++) {
+ 
+       // write small keys
+       int numSmall = 1000 + rand.nextInt(1000);
+       for (int s = 0; s < numSmall; s++) {
+         int sks = 50 + rand.nextInt(100);
+         // simulate row with multiple cols
+         for (int c = 0; c < 3; c++) {
+           st.addValue(sks);
+         }
+       }
+ 
+       // write a few large keys
+       int numLarge = 1 + rand.nextInt(1);
+       for (int l = 0; l < numLarge; l++) {
+         int lks = 500000 + rand.nextInt(1000000);
+         for (int c = 0; c < 3; c++) {
+           st.addValue(lks);
+         }
+       }
+     }
+ 
+     st.check();
+   }
+ }

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.