You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by se...@apache.org on 2018/03/09 01:19:55 UTC

[3/3] orc git commit: update

update


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/c6a58c67
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/c6a58c67
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/c6a58c67

Branch: refs/heads/orc-310
Commit: c6a58c67051b136246030153e798980200b47f1a
Parents: 06c93b3
Author: sergey <se...@apache.org>
Authored: Thu Mar 8 17:13:27 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Mar 8 17:13:27 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/orc/impl/OrcCodecPool.java  |  3 +++
 .../org/apache/orc/impl/PhysicalFsWriter.java   |  8 +++++---
 .../org/apache/orc/impl/RecordReaderUtils.java  | 21 ++++++++++++--------
 3 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 4ae43a2..8269316 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -69,6 +69,9 @@ public final class OrcCodecPool {
    */
   public static void returnCodecSafely(
       CompressionKind kind, CompressionCodec codec, boolean observedError) {
+    if (codec == null) {
+      return;
+    }
     try {
       if (!observedError) {
         returnCodec(kind, codec);

http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 5d7f75a..3f8e4e2 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final HadoopShims shims = HadoopShimsFactory.get();
 
-  private final FSDataOutputStream rawWriter;
+  private FSDataOutputStream rawWriter;
   // the compressed metadata information outStream
   private OutStream writer = null;
   // a protobuf outStream around streamFactory
@@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private final double paddingTolerance;
   private final long defaultStripeSize;
   private final CompressionKind compress;
-  private final CompressionCodec codec;
+  private CompressionCodec codec;
   private final boolean addBlockPadding;
 
   // the streams that make up the current stripe
@@ -228,10 +228,12 @@ public class PhysicalFsWriter implements PhysicalWriter {
     // We don't use the codec directly but do give it out codec in getCompressionCodec;
     // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that
     // would get rid of this pattern require cross-project interface changes, so just return the
-    // codec for now. If the codec is broken, reset will usually throw, so this is still the\
+    // codec for now. If the codec is broken, reset will usually throw, so this is still the
     // correct thing to do.
     OrcCodecPool.returnCodecSafely(compress, codec, false);
+    codec = null;
     rawWriter.close();
+    rawWriter = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index b486571..9d9e31a 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -143,12 +143,12 @@ public class RecordReaderUtils {
 
   private static class DefaultDataReader implements DataReader {
     private FSDataInputStream file = null;
-    private final ByteBufferAllocatorPool pool;
+    private ByteBufferAllocatorPool pool;
     private HadoopShims.ZeroCopyReaderShim zcr = null;
     private final FileSystem fs;
     private final Path path;
     private final boolean useZeroCopy;
-    private final CompressionCodec codec;
+    private CompressionCodec codec;
     private boolean hasCodecError = false;
     private final int bufferSize;
     private final int typeCount;
@@ -162,11 +162,6 @@ public class RecordReaderUtils {
       this.codec = OrcCodecPool.getCodec(compressionKind);
       this.bufferSize = properties.getBufferSize();
       this.typeCount = properties.getTypeCount();
-      if (useZeroCopy) {
-        this.pool = new ByteBufferAllocatorPool();
-      } else {
-        this.pool = null;
-      }
     }
 
     @Override
@@ -174,6 +169,7 @@ public class RecordReaderUtils {
       this.file = fs.open(path);
       if (useZeroCopy) {
         // ZCR only uses codec for boolean checks.
+        pool = new ByteBufferAllocatorPool();
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
         zcr = null;
@@ -308,6 +304,7 @@ public class RecordReaderUtils {
     public void close() throws IOException {
       if (codec != null) {
         OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError);
+        codec = null;
       }
       if (pool != null) {
         pool.clear();
@@ -316,6 +313,7 @@ public class RecordReaderUtils {
       try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
         if (file != null) {
           file.close();
+          file = null;
         }
       }
     }
@@ -332,8 +330,15 @@ public class RecordReaderUtils {
 
     @Override
     public DataReader clone() {
+      if (this.file != null) {
+        throw new UnsupportedOperationException(
+            "Cannot clone a DataReader that is already opened");
+      }
       try {
-        return (DataReader) super.clone();
+        DefaultDataReader clone = (DefaultDataReader) super.clone();
+        // Make sure we don't share the same codec between two readers.
+        clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+        return clone;
       } catch (CloneNotSupportedException e) {
         throw new UnsupportedOperationException("uncloneable", e);
       }