You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/03/15 16:39:01 UTC

orc git commit: ORC-310 Improved error handling of compression codecs when being reset. Also fixes reuse of codecs via DataReader.clone().

Repository: orc
Updated Branches:
  refs/heads/master 3d44366a4 -> 7dfe4a748


ORC-310 Improved error handling of compression codecs when being reset. Also
fixes reuse of codecs via DataReader.clone().

Fixes #222

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/7dfe4a74
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/7dfe4a74
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/7dfe4a74

Branch: refs/heads/master
Commit: 7dfe4a7483145be109201f1dbeeffac875ccef1a
Parents: 3d44366
Author: sergey <se...@apache.org>
Authored: Wed Feb 28 16:00:51 2018 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Mar 15 09:37:24 2018 -0700

----------------------------------------------------------------------
 .../src/java/org/apache/orc/DataReader.java     |  7 +++-
 .../java/org/apache/orc/impl/OrcCodecPool.java  | 37 ++++++++++-------
 .../org/apache/orc/impl/PhysicalFsWriter.java   | 10 ++++-
 .../org/apache/orc/impl/RecordReaderUtils.java  | 42 ++++++++++++--------
 4 files changed, 61 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java
index 3155862..ed3acbb 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -79,7 +79,10 @@ public interface DataReader extends AutoCloseable, Cloneable {
   @Override
   public void close() throws IOException;
 
-  /** Returns the compression codec used by this datareader. 
-   * @return */
+  /**
+   * Returns the compression codec used by this datareader.
+   * We should consider removing this from the interface.
+   * @return the compression codec
+   */
   CompressionCodec getCompressionCodec();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 56b9896..24ee618 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -18,10 +18,8 @@
 package org.apache.orc.impl;
 
 import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.slf4j.Logger;
@@ -62,26 +60,35 @@ public final class OrcCodecPool {
     return codec;
   }
 
+  /**
+   * Returns the codec to the pool or closes it, suppressing exceptions.
+   * @param kind Compression kind.
+   * @param codec Codec.
+   */
   public static void returnCodec(CompressionKind kind, CompressionCodec codec) {
     if (codec == null) {
       return;
     }
     assert kind != CompressionKind.NONE;
-    codec.reset();
-    List<CompressionCodec> list = POOL.get(kind);
-    if (list == null) {
-      List<CompressionCodec> newList = new ArrayList<>();
-      List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
-      list = (oldList == null) ? newList : oldList;
-    }
-    synchronized (list) {
-      if (list.size() < MAX_PER_KIND) {
-        list.add(codec);
-        return;
+    try {
+      codec.reset();
+      List<CompressionCodec> list = POOL.get(kind);
+      if (list == null) {
+        List<CompressionCodec> newList = new ArrayList<>();
+        List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
+        list = (oldList == null) ? newList : oldList;
+      }
+      synchronized (list) {
+        if (list.size() < MAX_PER_KIND) {
+          list.add(codec);
+          return;
+        }
       }
+      // We didn't add the codec to the list.
+      codec.close();
+    } catch (Exception ex) {
+      LOG.error("Ignoring codec cleanup error", ex);
     }
-    // We didn't add the codec to the list.
-    codec.close();
   }
 
   public static int getPoolSize(CompressionKind kind) {

http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 918fae8..38ca40e 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final HadoopShims shims = HadoopShimsFactory.get();
 
-  private final FSDataOutputStream rawWriter;
+  private FSDataOutputStream rawWriter;
   // the compressed metadata information outStream
   private OutStream writer = null;
   // a protobuf outStream around streamFactory
@@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private final double paddingTolerance;
   private final long defaultStripeSize;
   private final CompressionKind compress;
-  private final CompressionCodec codec;
+  private CompressionCodec codec;
   private final boolean addBlockPadding;
 
   // the streams that make up the current stripe
@@ -225,8 +225,14 @@ public class PhysicalFsWriter implements PhysicalWriter {
 
   @Override
   public void close() throws IOException {
+    // We don't use the codec directly but do give it out codec in getCompressionCodec;
+    // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that
+    // would get rid of this pattern require cross-project interface changes, so just return the
+    // codec for now.
     OrcCodecPool.returnCodec(compress, codec);
+    codec = null;
     rawWriter.close();
+    rawWriter = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 1e2d0f1..705e768 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,13 +17,15 @@
  */
 package org.apache.orc.impl;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +37,6 @@ import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
-
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 
@@ -44,6 +45,7 @@ import org.apache.orc.TypeDescription;
  */
 public class RecordReaderUtils {
   private static final HadoopShims SHIMS = HadoopShimsFactory.get();
+  private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class);
 
   static boolean hadBadBloomFilters(TypeDescription.Category category,
                                     OrcFile.WriterVersion version) {
@@ -143,12 +145,12 @@ public class RecordReaderUtils {
 
   private static class DefaultDataReader implements DataReader {
     private FSDataInputStream file = null;
-    private final ByteBufferAllocatorPool pool;
+    private ByteBufferAllocatorPool pool;
     private HadoopShims.ZeroCopyReaderShim zcr = null;
     private final FileSystem fs;
     private final Path path;
     private final boolean useZeroCopy;
-    private final CompressionCodec codec;
+    private CompressionCodec codec;
     private final int bufferSize;
     private final int typeCount;
     private CompressionKind compressionKind;
@@ -161,17 +163,14 @@ public class RecordReaderUtils {
       this.codec = OrcCodecPool.getCodec(compressionKind);
       this.bufferSize = properties.getBufferSize();
       this.typeCount = properties.getTypeCount();
-      if (useZeroCopy) {
-        this.pool = new ByteBufferAllocatorPool();
-      } else {
-        this.pool = null;
-      }
     }
 
     @Override
     public void open() throws IOException {
       this.file = fs.open(path);
       if (useZeroCopy) {
+        // ZCR only uses codec for boolean checks.
+        pool = new ByteBufferAllocatorPool();
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
         zcr = null;
@@ -231,8 +230,7 @@ public class RecordReaderUtils {
                 indexes[column] = OrcProto.RowIndex.parseFrom(
                     InStream.createCodedInputStream("index",
                         ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                        stream.getLength(),
-                    codec, bufferSize));
+                        stream.getLength(), codec, bufferSize));
               }
               break;
             case BLOOM_FILTER:
@@ -267,9 +265,9 @@ public class RecordReaderUtils {
       // read the footer
       ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
       file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-          ReaderImpl.singleton(new BufferChunk(tailBuf, 0)),
-          tailLength, codec, bufferSize));
+      return OrcProto.StripeFooter.parseFrom(
+          InStream.createCodedInputStream("footer", ReaderImpl.singleton(
+              new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
     }
 
     @Override
@@ -282,6 +280,7 @@ public class RecordReaderUtils {
     public void close() throws IOException {
       if (codec != null) {
         OrcCodecPool.returnCodec(compressionKind, codec);
+        codec = null;
       }
       if (pool != null) {
         pool.clear();
@@ -290,6 +289,7 @@ public class RecordReaderUtils {
       try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
         if (file != null) {
           file.close();
+          file = null;
         }
       }
     }
@@ -306,8 +306,18 @@ public class RecordReaderUtils {
 
     @Override
     public DataReader clone() {
+      if (this.file != null) {
+        // We should really throw here, but that will cause failures in Hive.
+        // While Hive uses clone, just log a warning.
+        LOG.warn("Cloning an opened DataReader; the stream will be reused and closed twice");
+      }
       try {
-        return (DataReader) super.clone();
+        DefaultDataReader clone = (DefaultDataReader) super.clone();
+        if (codec != null) {
+          // Make sure we don't share the same codec between two readers.
+          clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+        }
+        return clone;
       } catch (CloneNotSupportedException e) {
         throw new UnsupportedOperationException("uncloneable", e);
       }