You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/12/01 02:12:45 UTC
[accumulo] 01/01: Merge branch '1.7' into 1.8
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 06cb5ed4299d386612fd214ad0a1ab75c156d685
Merge: ed313f7 c28e11c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Nov 30 20:49:01 2017 -0500
Merge branch '1.7' into 1.8
.../org/apache/accumulo/core/file/rfile/RFile.java | 9 +-
.../accumulo/core/file/rfile/RollingStats.java | 114 ++++++++++++
.../core/file/rfile/RolllingStatsTest.java | 205 +++++++++++++++++++++
3 files changed, 324 insertions(+), 4 deletions(-)
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 4539392,fe1c832..cda246a
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@@ -327,103 -282,108 +326,105 @@@ public class RFile
}
- public static class Writer implements FileSKVWriter {
-
- public static final int MAX_CF_IN_DLG = 1000;
- private static final double MAX_BLOCK_MULTIPLIER = 1.1;
-
- private BlockFileWriter fileWriter;
- private ABlockWriter blockWriter;
+ private static class SampleEntry {
+ Key key;
+ Value val;
- // private BlockAppender blockAppender;
- private final long blockSize;
- private final long maxBlockSize;
- private final int indexBlockSize;
- private int entries = 0;
-
- private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
- private LocalityGroupMetadata currentLocalityGroup = null;
- private int nextBlock = 0;
+ SampleEntry(Key key, Value val) {
+ this.key = new Key(key);
+ this.val = new Value(val);
+ }
+ }
- private Key lastKeyInBlock = null;
+ private static class SampleLocalityGroupWriter {
- private boolean dataClosed = false;
- private boolean closed = false;
- private Key prevKey = new Key();
- private boolean startedDefaultLocalityGroup = false;
+ private Sampler sampler;
- private HashSet<ByteSequence> previousColumnFamilies;
+ private List<SampleEntry> entries = new ArrayList<>();
+ private long dataSize = 0;
- // Use windowed stats to fix ACCUMULO-4669
- private RollingStats keyLenStats = new RollingStats(2017);
- private double avergageKeySize = 0;
+ private LocalityGroupWriter lgr;
- public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
- this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+ public SampleLocalityGroupWriter(LocalityGroupWriter lgr, Sampler sampler) {
+ this.lgr = lgr;
+ this.sampler = sampler;
}
- public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
- this.blockSize = blockSize;
- this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
- this.indexBlockSize = indexBlockSize;
- this.fileWriter = bfw;
- this.blockWriter = null;
- previousColumnFamilies = new HashSet<>();
+ public void append(Key key, Value value) throws IOException {
+ if (sampler.accept(key)) {
+ entries.add(new SampleEntry(key, value));
+ dataSize += key.getSize() + value.getSize();
+ }
}
- @Override
- public synchronized void close() throws IOException {
-
- if (closed) {
- return;
+ public void close() throws IOException {
+ for (SampleEntry se : entries) {
+ lgr.append(se.key, se.val);
}
- closeData();
+ lgr.close();
+ }
- ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
+ public void flushIfNeeded() throws IOException {
+ if (dataSize > sampleBufferSize) {
+ // the reason to write out all but one key is so that closeBlock() can always eventually be called with true
+ List<SampleEntry> subList = entries.subList(0, entries.size() - 1);
- mba.writeInt(RINDEX_MAGIC);
- mba.writeInt(RINDEX_VER_7);
-
- if (currentLocalityGroup != null)
- localityGroups.add(currentLocalityGroup);
+ if (subList.size() > 0) {
+ for (SampleEntry se : subList) {
+ lgr.append(se.key, se.val);
+ }
- mba.writeInt(localityGroups.size());
+ lgr.closeBlock(subList.get(subList.size() - 1).key, false);
- for (LocalityGroupMetadata lc : localityGroups) {
- lc.write(mba);
+ subList.clear();
+ dataSize = 0;
+ }
}
+ }
+ }
- mba.close();
+ private static class LocalityGroupWriter {
- fileWriter.close();
+ private BlockFileWriter fileWriter;
+ private ABlockWriter blockWriter;
- closed = true;
- }
+ // private BlockAppender blockAppender;
+ private final long blockSize;
+ private final long maxBlockSize;
+ private int entries = 0;
- private void closeData() throws IOException {
+ private LocalityGroupMetadata currentLocalityGroup = null;
- if (dataClosed) {
- return;
- }
+ private Key lastKeyInBlock = null;
- dataClosed = true;
+ private Key prevKey = new Key();
- if (blockWriter != null) {
- closeBlock(lastKeyInBlock, true);
- }
+ private SampleLocalityGroupWriter sample;
+
- private SummaryStatistics keyLenStats = new SummaryStatistics();
++ // Use windowed stats to fix ACCUMULO-4669
++ private RollingStats keyLenStats = new RollingStats(2017);
+ private double avergageKeySize = 0;
+
+ LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
+ SampleLocalityGroupWriter sample) {
+ this.fileWriter = fileWriter;
+ this.blockSize = blockSize;
+ this.maxBlockSize = maxBlockSize;
+ this.currentLocalityGroup = currentLocalityGroup;
+ this.sample = sample;
}
private boolean isGiantKey(Key k) {
- // consider a key thats more than 3 standard deviations from previously seen key sizes as giant
- return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation() * 3;
+ double mean = keyLenStats.getMean();
+ double stddev = keyLenStats.getStandardDeviation();
+ return k.getSize() > mean + Math.max(9 * mean, 4 * stddev);
}
- @Override
public void append(Key key, Value value) throws IOException {
- if (dataClosed) {
- throw new IllegalStateException("Cannont append, data closed");
- }
-
if (key.compareTo(prevKey) < 0) {
- throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
+ throw new IllegalArgumentException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
}
currentLocalityGroup.updateColumnCount(key);
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
index 0000000,d223574..c0c5554
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
@@@ -1,0 -1,113 +1,114 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+ package org.apache.accumulo.core.file.rfile;
+
-import org.apache.commons.math.stat.StatUtils;
++import org.apache.commons.math3.stat.StatUtils;
++import org.apache.commons.math3.util.FastMath;
+
+ /**
+ * This class supports efficient window statistics. Apache commons math3 has a class called DescriptiveStatistics that supports windows. DescriptiveStatistics
+ * recomputes the statistics over the entire window each time its requested. In a test over 1,000,000 entries with a window size of 1019 that requested stats
+ * for each entry this class took ~50ms and DescriptiveStatistics took ~6,000ms.
+ *
+ * <p>
+ * This class may not be as accurate as DescriptiveStatistics. In unit test its within 1/1000 of DescriptiveStatistics.
+ */
+ class RollingStats {
+ private int position;
+ private double window[];
+
+ private double average;
+ private double variance;
+ private double stddev;
+
+ // indicates if the window is full
+ private boolean windowFull;
+
+ private int recomputeCounter = 0;
+
+ RollingStats(int windowSize) {
+ this.windowFull = false;
+ this.position = 0;
+ this.window = new double[windowSize];
+ }
+
+ /**
+ * @see <a href= "http://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/">Efficient and accurate rolling standard deviation</a>
+ */
+ private void update(double newValue, double oldValue, int windowSize) {
+ double delta = newValue - oldValue;
+
+ double oldAverage = average;
+ average = average + delta / windowSize;
+ variance += delta * (newValue - average + oldValue - oldAverage) / (windowSize - 1);
- stddev = Math.sqrt(variance);
++ stddev = FastMath.sqrt(variance);
+ }
+
+ void addValue(long stat) {
+
+ double old = window[position];
+ window[position] = stat;
+ position++;
+ recomputeCounter++;
+
+ if (windowFull) {
+ update(stat, old, window.length);
+ } else if (position == window.length) {
+ computeStats(window.length);
+ windowFull = true;
+ }
+
+ if (position == window.length) {
+ position = 0;
+ }
+ }
+
+ private void computeStats(int len) {
+ average = StatUtils.mean(window, 0, len);
+ variance = StatUtils.variance(window, average, 0, len);
- stddev = Math.sqrt(variance);
++ stddev = FastMath.sqrt(variance);
+ recomputeCounter = 0;
+ }
+
+ private void computeStats() {
+ if (windowFull) {
+ if (variance < 0 || recomputeCounter >= 100) {
+ // incremental computation drifts over time, so periodically force a recompute
+ computeStats(window.length);
+ }
+ } else if (recomputeCounter > 0) {
+ computeStats(position);
+ }
+ }
+
+ double getMean() {
+ computeStats();
+ return average;
+ }
+
+ double getVariance() {
+ computeStats();
+ return variance;
+ }
+
+ double getStandardDeviation() {
+ computeStats();
+ return stddev;
+ }
+
+ boolean isWindowFull() {
+ return windowFull;
+ }
+ }
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
index 0000000,28e1b20..4f8fcd1
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
@@@ -1,0 -1,178 +1,205 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.accumulo.core.file.rfile;
+
+ import java.util.Random;
+
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
++import org.apache.commons.math3.distribution.NormalDistribution;
++import org.apache.commons.math3.distribution.ZipfDistribution;
++import org.apache.commons.math3.random.Well19937c;
++import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+ import org.junit.Assert;
+ import org.junit.Test;
+
+ import com.google.common.math.DoubleMath;
+
+ public class RolllingStatsTest {
+
+ private static final double TOLERANCE = 1.0 / 1000;
+
+ private static void assertFuzzyEquals(double expected, double actual) {
+ Assert.assertTrue(String.format("expected: %f, actual: %f diff: %f", expected, actual, Math.abs(expected - actual)),
+ DoubleMath.fuzzyEquals(expected, actual, TOLERANCE));
+ }
+
+ private static void checkAgreement(DescriptiveStatistics ds, RollingStats rs) {
+ // getting stats from ds is expensive, so do it once... otherwise unit test takes 11 sec
+ // instead of 5 secs
+ double expMean = ds.getMean();
+ double expVar = ds.getVariance();
+ double expStdDev = Math.sqrt(expVar);
+
+ assertFuzzyEquals(expMean, rs.getMean());
+ assertFuzzyEquals(expVar, rs.getVariance());
+ assertFuzzyEquals(expStdDev, rs.getStandardDeviation());
+
+ Assert.assertTrue(expMean >= 0);
+ Assert.assertTrue(rs.getMean() >= 0);
+ Assert.assertTrue(expVar >= 0);
+ Assert.assertTrue(rs.getVariance() >= 0);
+ Assert.assertTrue(expStdDev >= 0);
+ Assert.assertTrue(rs.getStandardDeviation() >= 0);
+ }
+
+ private static class StatTester {
+
+ Random rand = new Random(42);
+ private DescriptiveStatistics ds;
+ private RollingStats rs;
+ private RollingStats rsp;
+
+ StatTester(int windowSize) {
+ ds = new DescriptiveStatistics();
+ ds.setWindowSize(windowSize);
+
+ rs = new RollingStats(windowSize);
+ rsp = new RollingStats(windowSize);
+ }
+
+ void addValue(long v) {
+ ds.addValue(v);
+ rs.addValue(v);
+ rsp.addValue(v);
+ checkAgreement(ds, rs);
+
+ if (rand.nextDouble() < 0.001) {
+ checkAgreement(ds, rsp);
+ }
+ }
+
+ void check() {
+ checkAgreement(ds, rsp);
+ }
+ }
+
+ @Test
+ public void testFewSizes() {
+ StatTester st = new StatTester(1019);
+ int[] keySizes = new int[] {103, 113, 123, 2345};
+ Random rand = new Random(42);
+ for (int i = 0; i < 10000; i++) {
+ st.addValue(keySizes[rand.nextInt(keySizes.length)]);
+ }
+ st.check();
+ }
+
+ @Test
+ public void testConstant() {
+
+ StatTester st = new StatTester(1019);
+
+ for (int i = 0; i < 10000; i++) {
+ st.addValue(111);
+ }
+
+ st.check();
+ }
+
+ @Test
+ public void testUniformIncreasing() {
+
+ for (int windowSize : new int[] {10, 13, 20, 100, 500}) {
+
+ StatTester st = new StatTester(windowSize);
+
+ Random rand = new Random();
+
+ for (int i = 0; i < 1000; i++) {
+ int v = 200 + rand.nextInt(50);
+
+ st.addValue(v);
+ }
+
+ st.check();
+ }
+ }
+
+ @Test
+ public void testSlowIncreases() {
+ // number of keys with the same len
+ int len = 100;
+
+ StatTester st = new StatTester(1019);
+
+ for (int i = 0; i < 50; i++) {
+ for (int j = 0; j < 3000; j++) {
+ st.addValue(len);
+ }
+
+ len = (int) (len * 1.1);
+ }
+
+ st.check();
+ }
+
+ @Test
++ public void testZipf() {
++ ZipfDistribution zd = new ZipfDistribution(new Well19937c(42), 1000, 2);
++ StatTester st = new StatTester(2017);
++
++ for (int i = 0; i < 7000; i++) {
++ st.addValue(zd.sample() * 100);
++ }
++
++ st.check();
++ }
++
++ @Test
++ public void testNormal() {
++ NormalDistribution nd = new NormalDistribution(new Well19937c(42), 200, 20);
++ StatTester st = new StatTester(2017);
++
++ for (int i = 0; i < 7000; i++) {
++ st.addValue((int) nd.sample());
++ }
++
++ st.check();
++ }
++
++ @Test
+ public void testSpikes() {
+
+ Random rand = new Random();
+
+ StatTester st = new StatTester(3017);
+
+ for (int i = 0; i < 13; i++) {
+
+ // write small keys
+ int numSmall = 1000 + rand.nextInt(1000);
+ for (int s = 0; s < numSmall; s++) {
+ int sks = 50 + rand.nextInt(100);
+ // simulate row with multiple cols
+ for (int c = 0; c < 3; c++) {
+ st.addValue(sks);
+ }
+ }
+
+ // write a few large keys
+ int numLarge = 1 + rand.nextInt(1);
+ for (int l = 0; l < numLarge; l++) {
+ int lks = 500000 + rand.nextInt(1000000);
+ for (int c = 0; c < 3; c++) {
+ st.addValue(lks);
+ }
+ }
+ }
+
+ st.check();
+ }
+ }
--
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.