You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/07/08 19:32:06 UTC
[3/5] accumulo git commit: Merge branch '1.6' into 1.7
Merge branch '1.6' into 1.7
Conflicts:
core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4d26943e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4d26943e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4d26943e
Branch: refs/heads/master
Commit: 4d26943e519f62744ba0cf1637ae32153957d53c
Parents: dffb1c5 c5ac9f3
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jul 8 14:39:02 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 8 14:39:02 2016 -0400
----------------------------------------------------------------------
.../core/file/rfile/bcfile/Compression.java | 312 +++++++++++++++----
.../core/file/rfile/bcfile/CompressionTest.java | 244 +++++++++++++++
2 files changed, 497 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4d26943e/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 48a9acc,3b82462..459fed5
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@@ -78,11 -86,39 +86,39 @@@ public final class Compression
public static final String COMPRESSION_NONE = "none";
/**
- * Compression algorithms.
+ * Compression algorithms. There is a static initializer, below the values defined in the enumeration, that calls the initializer of all defined codecs within
+ * the Algorithm enum. This promotes a model of the following call graph of initialization by the static initializer, followed by calls to getCodec() and
+ * createCompressionStream/DecompressionStream. In some cases, the compression and decompression call methods will include a different buffer size for the
+ * stream. Note that if the compressed buffer size requested in these calls is zero, we will not set the buffer size for that algorithm. Instead, we will use
+ * the default within the codec.
+ *
+ * The buffer size is configured in the Codec by way of a Hadoop Configuration reference. One approach may be to use the same Configuration object, but when
+ * calls are made to createCompressionStream and DecompressionStream, with non default buffer sizes, the configuration object must be changed. In this case,
+ * concurrent calls to createCompressionStream and DecompressionStream would mutate the configuration object beneath each other, requiring synchronization to
+ * avoid undesirable activity via co-modification. To avoid synchronization entirely, we will create Codecs with their own Configuration object and cache them
+ * for re-use. A default codec will be statically created, as mentioned above to ensure we always have a codec available at loader initialization.
+ *
+ * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use. Since they will have their own configuration object and thus do
+ * not need to be mutable, there is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal size of the cache and
+ * efficient and concurrent read/write access to the cache itself.
+ *
+ * To provide Algorithm specific details and to describe what is in code:
+ *
+ * LZO will always have the default LZO codec because the buffer size is never overridden within it.
+ *
+ * GZ will use the default GZ codec for the compression stream, but can potentially use a different codec instance for the decompression stream if the
+ * requested buffer size does not match the default GZ buffer size of 32k.
+ *
+ * Snappy will use the default Snappy codec with the default buffer size of 64k for the compression stream, but will use a cached codec if the buffer size
+ * differs from the default.
*/
public static enum Algorithm {
+
LZO(COMPRESSION_LZO) {
- private transient boolean checked = false;
+ /**
+ * determines if we've checked the codec status. ensures we don't recreate the defualt codec
+ */
- private transient AtomicBoolean checked = new AtomicBoolean(false);
++ private final AtomicBoolean checked = new AtomicBoolean(false);
private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec";
private transient CompressionCodec codec = null;
@@@ -102,12 -137,34 +137,35 @@@
return codec != null;
}
+ @Override
- CompressionCodec getCodec() throws IOException {
- if (!isSupported()) {
- throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?");
+ public void initializeDefaultCodec() {
+ if (!checked.get()) {
+ checked.set(true);
+ codec = createNewCodec(DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ @Override
+ CompressionCodec createNewCodec(int bufferSize) {
+ String extClazz = (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null);
+ String clazz = (extClazz != null) ? extClazz : defaultClazz;
+ try {
+ LOG.info("Trying to load Lzo codec class: " + clazz);
+ Configuration myConf = new Configuration(conf);
+ // only use the buffersize if > 0, otherwise we'll use
+ // the default defined within the codec
+ if (bufferSize > 0)
+ myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+ return codec;
+ } catch (ClassNotFoundException e) {
+ // that is okay
}
+ return null;
+ }
+ @Override
+ CompressionCodec getCodec() throws IOException {
return codec;
}
@@@ -160,12 -222,42 +223,43 @@@
}
@Override
- public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException {
+ public void initializeDefaultCodec() {
+ codec = (DefaultCodec) createNewCodec(DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a new GZ codec
+ *
+ * @param bufferSize
+ * buffer size to for GZ
+ * @return created codec
+ */
++ @Override
+ protected CompressionCodec createNewCodec(final int bufferSize) {
+ DefaultCodec myCodec = new DefaultCodec();
+ Configuration myConf = new Configuration(conf);
+ // only use the buffersize if > 0, otherwise we'll use
+ // the default defined within the codec
+ if (bufferSize > 0)
+ myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+ myCodec.setConf(myConf);
+ return myCodec;
+ }
+
+ @Override
+ public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException {
// Set the internal buffer size to read from down stream.
- if (downStreamBufferSize > 0) {
- codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
+ CompressionCodec decomCodec = codec;
+ // if we're not using the default, let's pull from the loading cache
+ if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) {
+ Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(GZ, downStreamBufferSize);
+ try {
+ decomCodec = codecCache.get(sizeOpt);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
}
- CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
+ CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@@@ -204,8 -296,16 +298,18 @@@
return downStream;
}
+ @Override
- public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
+ public void initializeDefaultCodec() {
+
+ }
+
++ @Override
+ protected CompressionCodec createNewCodec(final int bufferSize) {
+ return null;
+ }
+
+ @Override
+ public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
if (downStreamBufferSize > 0) {
return new BufferedOutputStream(downStream, downStreamBufferSize);
}
@@@ -222,19 -322,65 +326,67 @@@
SNAPPY(COMPRESSION_SNAPPY) {
// Use base type to avoid compile-time dependencies.
private transient CompressionCodec snappyCodec = null;
- private transient boolean checked = false;
+ /**
+ * determines if we've checked the codec status. ensures we don't recreate the defualt codec
+ */
- private transient AtomicBoolean checked = new AtomicBoolean(false);
++ private final AtomicBoolean checked = new AtomicBoolean(false);
private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec";
+ /**
+ * Buffer size option
+ */
+ private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize";
+
+ /**
+ * Default buffer size value
+ */
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+ @Override
public CompressionCodec getCodec() throws IOException {
- if (!isSupported()) {
- throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?");
- }
return snappyCodec;
}
@Override
- public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
+ public void initializeDefaultCodec() {
+ if (!checked.get()) {
+ checked.set(true);
+ snappyCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ /**
+ * Creates a new snappy codec.
+ *
+ * @param bufferSize
+ * incoming buffer size
+ * @return new codec or null, depending on if installed
+ */
++ @Override
+ protected CompressionCodec createNewCodec(final int bufferSize) {
+
+ String extClazz = (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null);
+ String clazz = (extClazz != null) ? extClazz : defaultClazz;
+ try {
+ LOG.info("Trying to load snappy codec class: " + clazz);
+
+ Configuration myConf = new Configuration(conf);
+ // only use the buffersize if > 0, otherwise we'll use
+ // the default defined within the codec
+ if (bufferSize > 0)
+ myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+
+ return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+ } catch (ClassNotFoundException e) {
+ // that is okay
+ }
+
+ return null;
+ }
+
+ @Override
+ public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?");
@@@ -280,9 -425,37 +431,38 @@@
return snappyCodec != null;
}
};
+
+ /**
+ * The model defined by the static block, below, creates a singleton for each defined codec in the Algorithm enumeration. By creating the codecs, each call
+ * to isSupported shall return true/false depending on if the codec singleton is defined. The static initializer, below, will ensure this occurs when the
+ * Enumeration is loaded. Furthermore, calls to getCodec will return the singleton, whether it is null or not.
+ *
+ * Calls to createCompressionStream and createDecompressionStream may return a different codec than getCodec, if the incoming downStreamBufferSize is
+ * different than the default. In such a case, we will place the resulting codec into the codecCache, defined below, to ensure we have cache codecs.
+ *
+ * Since codecs are immutable, there is no concern about concurrent access to the CompressionCodec objects within the guava cache.
+ */
+ static {
+ conf = new Configuration();
+ for (final Algorithm al : Algorithm.values()) {
+ al.initializeDefaultCodec();
+ }
+ }
+
+ /**
+ * Guava cache to have a limited factory pattern defined in the Algorithm enum.
+ */
+ private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache = CacheBuilder.newBuilder().maximumSize(25)
+ .build(new CacheLoader<Entry<Algorithm,Integer>,CompressionCodec>() {
++ @Override
+ public CompressionCodec load(Entry<Algorithm,Integer> key) {
+ return key.getKey().createNewCodec(key.getValue());
+ }
+ });
+
// We require that all compression related settings are configured
// statically in the Configuration object.
- protected static final Configuration conf = new Configuration();
+ protected static final Configuration conf;
private final String compressName;
// data input buffer size to absorb small reads from application.
private static final int DATA_IBUF_SIZE = 1 * 1024;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4d26943e/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
index 0000000,9615564..f3d6a22
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
@@@ -1,0 -1,250 +1,244 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+ package org.apache.accumulo.core.file.rfile.bcfile;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+
+ import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+ public class CompressionTest {
+
- HashMap<Compression.Algorithm,Boolean> isSupported = Maps.newHashMap();
++ HashMap<Compression.Algorithm,Boolean> isSupported = new HashMap<>();
+
+ @Before
+ public void testSupport() {
+ // we can safely assert that GZ exists by virtue of it being the DefaultCodec
+ isSupported.put(Compression.Algorithm.GZ, true);
+
+ Configuration myConf = new Configuration();
+
+ String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS);
+ String clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.LzoCodec";
+ try {
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+ Assert.assertNotNull(codec);
+ isSupported.put(Compression.Algorithm.LZO, true);
+
+ } catch (ClassNotFoundException e) {
+ // that is okay
+ }
+
+ extClazz = System.getProperty(Compression.Algorithm.CONF_SNAPPY_CLASS);
+ clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.SnappyCodec";
+ try {
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+ Assert.assertNotNull(codec);
+
+ isSupported.put(Compression.Algorithm.SNAPPY, true);
+
+ } catch (ClassNotFoundException e) {
+ // that is okay
+ }
+
+ }
+
+ @Test
+ public void testSingle() throws IOException {
+
+ for (final Algorithm al : Algorithm.values()) {
+ if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+ // first call to issupported should be true
+ Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+
+ Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+
+ Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+ }
+ }
+ }
+
+ @Test
+ public void testSingleNoSideEffect() throws IOException {
+
+ for (final Algorithm al : Algorithm.values()) {
+ if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+ Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+
+ Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+
+ // assert that additional calls to create will not create
+ // additional codecs
+
+ Assert.assertNotEquals(al + " should have created a new codec, but did not", System.identityHashCode(al.getCodec()), al.createNewCodec(88 * 1024));
+ }
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testManyStartNotNull() throws IOException, InterruptedException, ExecutionException {
+
+ for (final Algorithm al : Algorithm.values()) {
+ if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+ // first call to issupported should be true
+ Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+
+ final CompressionCodec codec = al.getCodec();
+
+ Assert.assertNotNull(al + " should not be null", codec);
+
+ ExecutorService service = Executors.newFixedThreadPool(10);
+
- ArrayList<Future<Boolean>> results = Lists.newArrayList();
++ ArrayList<Future<Boolean>> results = new ArrayList<>();
+
+ for (int i = 0; i < 30; i++) {
- results.add(service.submit(new Callable<Boolean>()
-
- {
++ results.add(service.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ Assert.assertNotNull(al + " should not be null", al.getCodec());
+ return true;
+ }
+
+ }));
+ }
+
+ service.shutdown();
+
+ Assert.assertNotNull(al + " should not be null", codec);
+
+ while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+ // wait
+ }
+
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+ }
+ }
+ }
+
+ }
+
+ // don't start until we have created the codec
+ @Test(timeout = 60 * 1000)
+ public void testManyDontStartUntilThread() throws IOException, InterruptedException, ExecutionException {
+
+ for (final Algorithm al : Algorithm.values()) {
+ if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+ // first call to issupported should be true
+ Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+
+ ExecutorService service = Executors.newFixedThreadPool(10);
+
- ArrayList<Future<Boolean>> results = Lists.newArrayList();
++ ArrayList<Future<Boolean>> results = new ArrayList<>();
+
+ for (int i = 0; i < 30; i++) {
+
+ results.add(service.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+ return true;
+ }
+
+ }));
+ }
+
+ service.shutdown();
+
+ while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+ // wait
+ }
+
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+ }
+ }
+ }
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testThereCanBeOnlyOne() throws IOException, InterruptedException, ExecutionException {
+
+ for (final Algorithm al : Algorithm.values()) {
+ if (isSupported.get(al) != null && isSupported.get(al) == true) {
+
+ // first call to issupported should be true
+ Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+
+ ExecutorService service = Executors.newFixedThreadPool(20);
+
- ArrayList<Callable<Boolean>> list = Lists.newArrayList();
++ ArrayList<Callable<Boolean>> list = new ArrayList<>();
+
- ArrayList<Future<Boolean>> results = Lists.newArrayList();
++ ArrayList<Future<Boolean>> results = new ArrayList<>();
+
+ // keep track of the system's identity hashcodes.
- final HashSet<Integer> testSet = Sets.newHashSet();
++ final HashSet<Integer> testSet = new HashSet<>();
+
+ for (int i = 0; i < 40; i++) {
+ list.add(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ CompressionCodec codec = al.getCodec();
+ Assert.assertNotNull(al + " resulted in a non-null codec", codec);
+ // add the identity hashcode to the set.
+ testSet.add(System.identityHashCode(codec));
+ return true;
+ }
+ });
+ }
+
+ results.addAll(service.invokeAll(list));
+ // ensure that we
+ Assert.assertEquals(al + " created too many codecs", 1, testSet.size());
+ service.shutdown();
+
+ while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+ // wait
+ }
+
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+ }
+ }
+ }
+ }
+
+ }