You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/07/08 19:32:06 UTC

[3/5] accumulo git commit: Merge branch '1.6' into 1.7

Merge branch '1.6' into 1.7

Conflicts:
	core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4d26943e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4d26943e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4d26943e

Branch: refs/heads/master
Commit: 4d26943e519f62744ba0cf1637ae32153957d53c
Parents: dffb1c5 c5ac9f3
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jul 8 14:39:02 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 8 14:39:02 2016 -0400

----------------------------------------------------------------------
 .../core/file/rfile/bcfile/Compression.java     | 312 +++++++++++++++----
 .../core/file/rfile/bcfile/CompressionTest.java | 244 +++++++++++++++
 2 files changed, 497 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4d26943e/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 48a9acc,3b82462..459fed5
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@@ -78,11 -86,39 +86,39 @@@ public final class Compression 
    public static final String COMPRESSION_NONE = "none";
  
    /**
-    * Compression algorithms.
+    * Compression algorithms. There is a static initializer, below the values defined in the enumeration, that calls the initializer of all defined codecs within
+    * the Algorithm enum. This promotes a model of the following call graph of initialization by the static initializer, followed by calls to getCodec() and
+    * createCompressionStream/DecompressionStream. In some cases, the compression and decompression call methods will include a different buffer size for the
+    * stream. Note that if the compressed buffer size requested in these calls is zero, we will not set the buffer size for that algorithm. Instead, we will use
+    * the default within the codec.
+    *
+    * The buffer size is configured in the Codec by way of a Hadoop Configuration reference. One approach may be to use the same Configuration object, but when
+    * calls are made to createCompressionStream and DecompressionStream, with non default buffer sizes, the configuration object must be changed. In this case,
+    * concurrent calls to createCompressionStream and DecompressionStream would mutate the configuration object beneath each other, requiring synchronization to
+    * avoid undesirable activity via co-modification. To avoid synchronization entirely, we will create Codecs with their own Configuration object and cache them
+    * for re-use. A default codec will be statically created, as mentioned above to ensure we always have a codec available at loader initialization.
+    *
+    * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use. Since they will have their own configuration object and thus do
+    * not need to be mutable, there is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal size of the cache and
+    * efficient and concurrent read/write access to the cache itself.
+    *
+    * To provide Algorithm specific details and to describe what is in code:
+    *
+    * LZO will always have the default LZO codec because the buffer size is never overridden within it.
+    *
+    * GZ will use the default GZ codec for the compression stream, but can potentially use a different codec instance for the decompression stream if the
+    * requested buffer size does not match the default GZ buffer size of 32k.
+    *
+    * Snappy will use the default Snappy codec with the default buffer size of 64k for the compression stream, but will use a cached codec if the buffer size
+    * differs from the default.
     */
    public static enum Algorithm {
+ 
      LZO(COMPRESSION_LZO) {
-       private transient boolean checked = false;
+       /**
+        * determines if we've checked the codec status. ensures we don't recreate the defualt codec
+        */
 -      private transient AtomicBoolean checked = new AtomicBoolean(false);
++      private final AtomicBoolean checked = new AtomicBoolean(false);
        private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec";
        private transient CompressionCodec codec = null;
  
@@@ -102,12 -137,34 +137,35 @@@
          return codec != null;
        }
  
 +      @Override
-       CompressionCodec getCodec() throws IOException {
-         if (!isSupported()) {
-           throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?");
+       public void initializeDefaultCodec() {
+         if (!checked.get()) {
+           checked.set(true);
+           codec = createNewCodec(DEFAULT_BUFFER_SIZE);
+         }
+       }
+ 
+       @Override
+       CompressionCodec createNewCodec(int bufferSize) {
+         String extClazz = (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null);
+         String clazz = (extClazz != null) ? extClazz : defaultClazz;
+         try {
+           LOG.info("Trying to load Lzo codec class: " + clazz);
+           Configuration myConf = new Configuration(conf);
+           // only use the buffersize if > 0, otherwise we'll use
+           // the default defined within the codec
+           if (bufferSize > 0)
+             myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+           codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+           return codec;
+         } catch (ClassNotFoundException e) {
+           // that is okay
          }
+         return null;
+       }
  
+       @Override
+       CompressionCodec getCodec() throws IOException {
          return codec;
        }
  
@@@ -160,12 -222,42 +223,43 @@@
        }
  
        @Override
-       public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException {
+       public void initializeDefaultCodec() {
+         codec = (DefaultCodec) createNewCodec(DEFAULT_BUFFER_SIZE);
+       }
+ 
+       /**
+        * Create a new GZ codec
+        *
+        * @param bufferSize
+        *          buffer size to for GZ
+        * @return created codec
+        */
++      @Override
+       protected CompressionCodec createNewCodec(final int bufferSize) {
+         DefaultCodec myCodec = new DefaultCodec();
+         Configuration myConf = new Configuration(conf);
+         // only use the buffersize if > 0, otherwise we'll use
+         // the default defined within the codec
+         if (bufferSize > 0)
+           myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+         myCodec.setConf(myConf);
+         return myCodec;
+       }
+ 
+       @Override
+       public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException {
          // Set the internal buffer size to read from down stream.
-         if (downStreamBufferSize > 0) {
-           codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
+         CompressionCodec decomCodec = codec;
+         // if we're not using the default, let's pull from the loading cache
+         if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) {
+           Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(GZ, downStreamBufferSize);
+           try {
+             decomCodec = codecCache.get(sizeOpt);
+           } catch (ExecutionException e) {
+             throw new IOException(e);
+           }
          }
-         CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
+         CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
          BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
          return bis2;
        }
@@@ -204,8 -296,16 +298,18 @@@
          return downStream;
        }
  
 +      @Override
-       public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
+       public void initializeDefaultCodec() {
+ 
+       }
+ 
++      @Override
+       protected CompressionCodec createNewCodec(final int bufferSize) {
+         return null;
+       }
+ 
+       @Override
+       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
          if (downStreamBufferSize > 0) {
            return new BufferedOutputStream(downStream, downStreamBufferSize);
          }
@@@ -222,19 -322,65 +326,67 @@@
      SNAPPY(COMPRESSION_SNAPPY) {
        // Use base type to avoid compile-time dependencies.
        private transient CompressionCodec snappyCodec = null;
-       private transient boolean checked = false;
+       /**
+        * determines if we've checked the codec status. ensures we don't recreate the defualt codec
+        */
 -      private transient AtomicBoolean checked = new AtomicBoolean(false);
++      private final AtomicBoolean checked = new AtomicBoolean(false);
        private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec";
  
+       /**
+        * Buffer size option
+        */
+       private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize";
+ 
+       /**
+        * Default buffer size value
+        */
+       private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ 
 +      @Override
        public CompressionCodec getCodec() throws IOException {
-         if (!isSupported()) {
-           throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?");
-         }
          return snappyCodec;
        }
  
        @Override
-       public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
+       public void initializeDefaultCodec() {
+         if (!checked.get()) {
+           checked.set(true);
+           snappyCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
+         }
+       }
+ 
+       /**
+        * Creates a new snappy codec.
+        *
+        * @param bufferSize
+        *          incoming buffer size
+        * @return new codec or null, depending on if installed
+        */
++      @Override
+       protected CompressionCodec createNewCodec(final int bufferSize) {
+ 
+         String extClazz = (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null);
+         String clazz = (extClazz != null) ? extClazz : defaultClazz;
+         try {
+           LOG.info("Trying to load snappy codec class: " + clazz);
+ 
+           Configuration myConf = new Configuration(conf);
+           // only use the buffersize if > 0, otherwise we'll use
+           // the default defined within the codec
+           if (bufferSize > 0)
+             myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+ 
+           return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+ 
+         } catch (ClassNotFoundException e) {
+           // that is okay
+         }
+ 
+         return null;
+       }
+ 
+       @Override
+       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
  
          if (!isSupported()) {
            throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?");
@@@ -280,9 -425,37 +431,38 @@@
          return snappyCodec != null;
        }
      };
+ 
+     /**
+      * The model defined by the static block, below, creates a singleton for each defined codec in the Algorithm enumeration. By creating the codecs, each call
+      * to isSupported shall return true/false depending on if the codec singleton is defined. The static initializer, below, will ensure this occurs when the
+      * Enumeration is loaded. Furthermore, calls to getCodec will return the singleton, whether it is null or not.
+      *
+      * Calls to createCompressionStream and createDecompressionStream may return a different codec than getCodec, if the incoming downStreamBufferSize is
+      * different than the default. In such a case, we will place the resulting codec into the codecCache, defined below, to ensure we have cache codecs.
+      *
+      * Since codecs are immutable, there is no concern about concurrent access to the CompressionCodec objects within the guava cache.
+      */
+     static {
+       conf = new Configuration();
+       for (final Algorithm al : Algorithm.values()) {
+         al.initializeDefaultCodec();
+       }
+     }
+ 
+     /**
+      * Guava cache to have a limited factory pattern defined in the Algorithm enum.
+      */
+     private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache = CacheBuilder.newBuilder().maximumSize(25)
+         .build(new CacheLoader<Entry<Algorithm,Integer>,CompressionCodec>() {
++          @Override
+           public CompressionCodec load(Entry<Algorithm,Integer> key) {
+             return key.getKey().createNewCodec(key.getValue());
+           }
+         });
+ 
      // We require that all compression related settings are configured
      // statically in the Configuration object.
-     protected static final Configuration conf = new Configuration();
+     protected static final Configuration conf;
      private final String compressName;
      // data input buffer size to absorb small reads from application.
      private static final int DATA_IBUF_SIZE = 1 * 1024;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4d26943e/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
index 0000000,9615564..f3d6a22
mode 000000,100644..100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
@@@ -1,0 -1,250 +1,244 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements. See the NOTICE file distributed with this
+  * work for additional information regarding copyright ownership. The ASF
+  * licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  * License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ package org.apache.accumulo.core.file.rfile.bcfile;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
 -import com.google.common.collect.Lists;
 -import com.google.common.collect.Maps;
 -import com.google.common.collect.Sets;
 -
+ public class CompressionTest {
+ 
 -  HashMap<Compression.Algorithm,Boolean> isSupported = Maps.newHashMap();
++  HashMap<Compression.Algorithm,Boolean> isSupported = new HashMap<>();
+ 
+   @Before
+   public void testSupport() {
+     // we can safely assert that GZ exists by virtue of it being the DefaultCodec
+     isSupported.put(Compression.Algorithm.GZ, true);
+ 
+     Configuration myConf = new Configuration();
+ 
+     String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS);
+     String clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.LzoCodec";
+     try {
+       CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+ 
+       Assert.assertNotNull(codec);
+       isSupported.put(Compression.Algorithm.LZO, true);
+ 
+     } catch (ClassNotFoundException e) {
+       // that is okay
+     }
+ 
+     extClazz = System.getProperty(Compression.Algorithm.CONF_SNAPPY_CLASS);
+     clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.SnappyCodec";
+     try {
+       CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+ 
+       Assert.assertNotNull(codec);
+ 
+       isSupported.put(Compression.Algorithm.SNAPPY, true);
+ 
+     } catch (ClassNotFoundException e) {
+       // that is okay
+     }
+ 
+   }
+ 
+   @Test
+   public void testSingle() throws IOException {
+ 
+     for (final Algorithm al : Algorithm.values()) {
+       if (isSupported.get(al) != null && isSupported.get(al) == true) {
+ 
+         // first call to issupported should be true
+         Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+ 
+         Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+ 
+         Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+       }
+     }
+   }
+ 
+   @Test
+   public void testSingleNoSideEffect() throws IOException {
+ 
+     for (final Algorithm al : Algorithm.values()) {
+       if (isSupported.get(al) != null && isSupported.get(al) == true) {
+ 
+         Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+ 
+         Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+ 
+         // assert that additional calls to create will not create
+         // additional codecs
+ 
+         Assert.assertNotEquals(al + " should have created a new codec, but did not", System.identityHashCode(al.getCodec()), al.createNewCodec(88 * 1024));
+       }
+     }
+   }
+ 
+   @Test(timeout = 60 * 1000)
+   public void testManyStartNotNull() throws IOException, InterruptedException, ExecutionException {
+ 
+     for (final Algorithm al : Algorithm.values()) {
+       if (isSupported.get(al) != null && isSupported.get(al) == true) {
+ 
+         // first call to issupported should be true
+         Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+ 
+         final CompressionCodec codec = al.getCodec();
+ 
+         Assert.assertNotNull(al + " should not be null", codec);
+ 
+         ExecutorService service = Executors.newFixedThreadPool(10);
+ 
 -        ArrayList<Future<Boolean>> results = Lists.newArrayList();
++        ArrayList<Future<Boolean>> results = new ArrayList<>();
+ 
+         for (int i = 0; i < 30; i++) {
 -          results.add(service.submit(new Callable<Boolean>()
 -
 -          {
++          results.add(service.submit(new Callable<Boolean>() {
+ 
+             @Override
+             public Boolean call() throws Exception {
+               Assert.assertNotNull(al + " should not be null", al.getCodec());
+               return true;
+             }
+ 
+           }));
+         }
+ 
+         service.shutdown();
+ 
+         Assert.assertNotNull(al + " should not be null", codec);
+ 
+         while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+           // wait
+         }
+ 
+         for (Future<Boolean> result : results) {
+           Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+         }
+       }
+     }
+ 
+   }
+ 
+   // don't start until we have created the codec
+   @Test(timeout = 60 * 1000)
+   public void testManyDontStartUntilThread() throws IOException, InterruptedException, ExecutionException {
+ 
+     for (final Algorithm al : Algorithm.values()) {
+       if (isSupported.get(al) != null && isSupported.get(al) == true) {
+ 
+         // first call to issupported should be true
+         Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+ 
+         ExecutorService service = Executors.newFixedThreadPool(10);
+ 
 -        ArrayList<Future<Boolean>> results = Lists.newArrayList();
++        ArrayList<Future<Boolean>> results = new ArrayList<>();
+ 
+         for (int i = 0; i < 30; i++) {
+ 
+           results.add(service.submit(new Callable<Boolean>() {
+ 
+             @Override
+             public Boolean call() throws Exception {
+               Assert.assertNotNull(al + " should have a non-null codec", al.getCodec());
+               return true;
+             }
+ 
+           }));
+         }
+ 
+         service.shutdown();
+ 
+         while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+           // wait
+         }
+ 
+         for (Future<Boolean> result : results) {
+           Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+         }
+       }
+     }
+ 
+   }
+ 
+   @Test(timeout = 60 * 1000)
+   public void testThereCanBeOnlyOne() throws IOException, InterruptedException, ExecutionException {
+ 
+     for (final Algorithm al : Algorithm.values()) {
+       if (isSupported.get(al) != null && isSupported.get(al) == true) {
+ 
+         // first call to issupported should be true
+         Assert.assertTrue(al + " is not supported, but should be", al.isSupported());
+ 
+         ExecutorService service = Executors.newFixedThreadPool(20);
+ 
 -        ArrayList<Callable<Boolean>> list = Lists.newArrayList();
++        ArrayList<Callable<Boolean>> list = new ArrayList<>();
+ 
 -        ArrayList<Future<Boolean>> results = Lists.newArrayList();
++        ArrayList<Future<Boolean>> results = new ArrayList<>();
+ 
+         // keep track of the system's identity hashcodes.
 -        final HashSet<Integer> testSet = Sets.newHashSet();
++        final HashSet<Integer> testSet = new HashSet<>();
+ 
+         for (int i = 0; i < 40; i++) {
+           list.add(new Callable<Boolean>() {
+ 
+             @Override
+             public Boolean call() throws Exception {
+               CompressionCodec codec = al.getCodec();
+               Assert.assertNotNull(al + " resulted in a non-null codec", codec);
+               // add the identity hashcode to the set.
+               testSet.add(System.identityHashCode(codec));
+               return true;
+             }
+           });
+         }
+ 
+         results.addAll(service.invokeAll(list));
+         // ensure that we
+         Assert.assertEquals(al + " created too many codecs", 1, testSet.size());
+         service.shutdown();
+ 
+         while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+           // wait
+         }
+ 
+         for (Future<Boolean> result : results) {
+           Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get());
+         }
+       }
+     }
+   }
+ 
+ }