You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/09/24 08:55:43 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2321: DRILL-7969: Read and write Parquet with brotli, lzo, lz4, zstd codecs

vvysotskyi commented on a change in pull request #2321:
URL: https://github.com/apache/drill/pull/2321#discussion_r715425598



##########
File path: contrib/storage-hive/hive-exec-shade/pom.xml
##########
@@ -120,6 +120,10 @@
           <groupId>org.codehaus.jackson</groupId>
           <artifactId>jackson-xc</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>io.airlift</groupId>
+          <artifactId>aircompressor</artifactId>
+        </exclusion>

Review comment:
       Is there any reason for excluding `aircompressor` here if you have relocated it below?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Stack;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import io.airlift.compress.Compressor;
+
+/**
+ * A shim making an aircompressor compressor available through the BytesInputCompressor
+ * interface.
+ */
+public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AirliftBytesInputCompressor.class);

Review comment:
       Please either use imports instead of specifying the whole pakage here or use Lombok.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A delegating compression codec factory that returns (de)compressors based on
+ * https://github.com/airlift/aircompressor when possible and falls back to
+ * parquet-mr otherwise.  The aircompressor lib was introduced into Drill
+ * because of difficulties encountered with the JNI-based implementations of
+ * lzo, lz4 and zstd in parquet-mr.
+ *
+ * By modifying the constant AIRCOMPRESSOR_CODECS it is possible to choose
+ * which codecs should be routed to which lib.  In addition, this class
+ * implements parquet-mr's CompressionCodecFactory interface meaning that
+ * swapping this factory for e.g. one in parquet-mr will have minimal impact
+ * on the calling code in Parquet reading and writing parts of the Drill code
+ * base.
+ *
+ */
+public class DrillCompressionCodecFactory implements CompressionCodecFactory {
+
+  // The set of codecs to be handled by aircompressor
+  private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
+      Arrays.asList(new CompressionCodecName[] { CompressionCodecName.LZ4, CompressionCodecName.LZO,

Review comment:
       `new CompressionCodecName[] {}` is redundant here, please remove it and specify a list of codecs for `Arrays.asList()` method.

##########
File path: exec/jdbc-all/pom.xml
##########
@@ -548,7 +548,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>46600000</maxsize>
+                  <maxsize>47200000</maxsize>

Review comment:
       Are you sure that these newly added libraries should be included in the JDBC jar? I think it is better to exclude them from here...

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A delegating compression codec factory that returns (de)compressors based on
+ * https://github.com/airlift/aircompressor when possible and falls back to
+ * parquet-mr otherwise.  The aircompressor lib was introduced into Drill
+ * because of difficulties encountered with the JNI-based implementations of
+ * lzo, lz4 and zstd in parquet-mr.
+ *
+ * By modifying the constant AIRCOMPRESSOR_CODECS it is possible to choose
+ * which codecs should be routed to which lib.  In addition, this class
+ * implements parquet-mr's CompressionCodecFactory interface meaning that
+ * swapping this factory for e.g. one in parquet-mr will have minimal impact
+ * on the calling code in Parquet reading and writing parts of the Drill code
+ * base.
+ *
+ */
+public class DrillCompressionCodecFactory implements CompressionCodecFactory {
+
+  // The set of codecs to be handled by aircompressor
+  private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
+      Arrays.asList(new CompressionCodecName[] { CompressionCodecName.LZ4, CompressionCodecName.LZO,
+          CompressionCodecName.SNAPPY, CompressionCodecName.ZSTD }));
+
+  // pool of reused aircompressor compressors (parquet-mr's factory has its own)
+  private final Map<CompressionCodecName, BytesInputCompressor> compressors = new HashMap<>();
+
+  // pool of reused aircompressor decompressors (parquet-mr's factory has its own)
+  private final Map<CompressionCodecName, BytesInputDecompressor> decompressors = new HashMap<>();
+
+  // fallback parquet-mr compression codec factory
+  private CompressionCodecFactory parqCodecFactory;
+
+  // direct memory allocator to be used during (de)compression
+  private ByteBufferAllocator allocator;
+
+  // static builder method, solely to mimick the parquet-mr API as closely as possible
+  public static CompressionCodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator,
+      int pageSize) {
+    return new DrillCompressionCodecFactory(config, allocator, pageSize);
+  }
+
+  public DrillCompressionCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+    this.allocator = allocator;
+    this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+  }
+
+  @Override
+  public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
+    if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
+      BytesInputCompressor comp = compressors.get(codecName);
+      if (comp == null) {
+        comp = new AirliftBytesInputCompressor(codecName, allocator);
+        compressors.put(codecName, comp);
+      }
+      return comp;

Review comment:
       Please use the `Map.computeIfAbsent()` method here and below.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Stack;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import io.airlift.compress.Compressor;
+
+/**
+ * A shim making an aircompressor compressor available through the BytesInputCompressor
+ * interface.
+ */
+public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AirliftBytesInputCompressor.class);
+
+  // the codec used by this compressor
+  private CompressionCodecName codecName;
+
+  // the relevant aircompressor compressor
+  private Compressor airComp = null;
+
+  // the direct memory allocator to be used during compression
+  private ByteBufferAllocator allocator;
+
+  // stack tracking all direct memory buffers we allocated, and must release
+  private Stack<ByteBuffer> ourAllocations;
+
+  public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
+    this.codecName = codecName;
+
+    switch (codecName) {
+    case LZ4:
+      airComp = new io.airlift.compress.lz4.Lz4Compressor();

Review comment:
       Please use imports here and below too.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
##########
@@ -17,18 +17,24 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import static org.apache.parquet.column.Encoding.valueOf;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;

Review comment:
       Please revert changes related to rearranging imports until we don't have determined the correct ordering for all the project.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Stack;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import io.airlift.compress.Compressor;
+
+/**
+ * A shim making an aircompressor compressor available through the BytesInputCompressor
+ * interface.
+ */
+public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AirliftBytesInputCompressor.class);
+
+  // the codec used by this compressor
+  private CompressionCodecName codecName;
+
+  // the relevant aircompressor compressor
+  private Compressor airComp = null;
+
+  // the direct memory allocator to be used during compression
+  private ByteBufferAllocator allocator;
+
+  // stack tracking all direct memory buffers we allocated, and must release
+  private Stack<ByteBuffer> ourAllocations;

Review comment:
       ```suggestion
     private Stack<ByteBuffer> allocatedBuffers;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org