You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/07/08 14:24:18 UTC

[accumulo] branch main updated: Computes correct override class and adds unit test for both Hadoop and System property configuration. (#2800)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e56361501 Computes correct override class and adds unit test for both Hadoop and System property configuration. (#2800)
4e56361501 is described below

commit 4e56361501bf0e14016a76ca9e12e880f80b5091
Author: Dave Marion <dl...@apache.org>
AuthorDate: Fri Jul 8 10:24:11 2022 -0400

    Computes correct override class and adds unit test for both Hadoop and System property configuration. (#2800)
    
    Closes #2416
    
    Co-authored-by: Damon Brown <db...@trietop.com>
---
 .../file/rfile/bcfile/CompressionAlgorithm.java    |   8 +-
 .../core/file/rfile/bcfile/CompressionTest.java    |  40 +++++
 .../core/file/rfile/bcfile/DummyCodec.java         | 172 +++++++++++++++++++++
 3 files changed, 215 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java
index a42034db09..b049b8e5a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java
@@ -287,19 +287,17 @@ public class CompressionAlgorithm extends Configured {
    */
   private CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz,
       final int bufferSize, final String bufferSizeConfigOpt) {
-    String extClazz = null;
+    String clazz = defaultClazz;
     if (codecClazzProp != null) {
-      extClazz =
-          (getConf().get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null);
+      clazz = System.getProperty(codecClazzProp, getConf().get(codecClazzProp, defaultClazz));
     }
-    String clazz = (extClazz != null) ? extClazz : defaultClazz;
     try {
       LOG.info("Trying to load codec class {}", clazz);
       Configuration config = new Configuration(getConf());
       updateBuffer(config, bufferSizeConfigOpt, bufferSize);
       return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config);
     } catch (ClassNotFoundException e) {
-      // This is okay.
+      LOG.debug("Unable to load codec class {} for {}", clazz, codecClazzProp, e);
     }
     return null;
   }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
index 00e5b292b3..8f38b1b30e 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.file.rfile.compression.Snappy;
 import org.apache.accumulo.core.spi.file.rfile.compression.ZStandard;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -302,4 +303,43 @@ public class CompressionTest {
     }
   }
 
+  @Test
+  public void testHadoopCodecOverride() {
+    Configuration conf = new Configuration(false);
+    conf.set(new ZStandard().getCodecClassNameProperty(), DummyCodec.class.getName());
+    CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("zstd");
+    algo.setConf(conf);
+    CompressionCodec dummyCodec = algo.createNewCodec(4096);
+    assertEquals(DummyCodec.class, dummyCodec.getClass(), "Hadoop override DummyCodec not loaded");
+  }
+
+  @Test
+  public void testSystemPropertyCodecOverride() {
+    System.setProperty(new Lz4().getCodecClassNameProperty(), DummyCodec.class.getName());
+    try {
+      CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("lz4");
+      CompressionCodec dummyCodec = algo.createNewCodec(4096);
+      assertEquals(DummyCodec.class, dummyCodec.getClass(),
+          "Hadoop override DummyCodec not loaded");
+    } finally {
+      System.clearProperty(new Lz4().getCodecClassNameProperty());
+    }
+  }
+
+  @Test
+  public void testSystemPropertyOverridesConf() {
+    System.setProperty(new Snappy().getCodecClassNameProperty(), DummyCodec.class.getName());
+    try {
+      Configuration conf = new Configuration(false);
+      conf.set(new Snappy().getCodecClassNameProperty(), SnappyCodec.class.getName());
+      CompressionAlgorithm algo = Compression.getCompressionAlgorithmByName("snappy");
+      algo.setConf(conf);
+      CompressionCodec dummyCodec = algo.createNewCodec(4096);
+      assertEquals(DummyCodec.class, dummyCodec.getClass(),
+          "Hadoop override DummyCodec not loaded");
+    } finally {
+      System.clearProperty(new Snappy().getCodecClassNameProperty());
+    }
+  }
+
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java
new file mode 100644
index 0000000000..831cc2b97d
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/DummyCodec.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+public class DummyCodec implements Configurable, CompressionCodec {
+
+  Configuration conf;
+
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param conf
+   *          the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this object.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given {@link OutputStream}.
+   *
+   * @param out
+   *          the location for the final output stream
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   *           unsupported operation
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given {@link OutputStream} with
+   * the given {@link Compressor}.
+   *
+   * @param out
+   *          the location for the final output stream
+   * @param compressor
+   *          compressor to use
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   *           unsupported operation
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
+  @Override
+  public Compressor createCompressor() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given input stream.
+   *
+   * @param in
+   *          the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   *           unsupported operation
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given {@link InputStream} with
+   * the given {@link Decompressor}.
+   *
+   * @param in
+   *          the stream to read compressed bytes from
+   * @param decompressor
+   *          decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   *           unsupported operation
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
+  @Override
+  public Decompressor createDecompressor() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Get the default filename extension for this kind of compression.
+   *
+   * @return <code>.dummy</code>.
+   */
+  @Override
+  public String getDefaultExtension() {
+    return ".dummy";
+  }
+
+}