You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/03/15 16:39:01 UTC
orc git commit: ORC-310 Improved error handling of compression codecs
when being reset. Also fixes reuse of codecs via DataReader.clone().
Repository: orc
Updated Branches:
refs/heads/master 3d44366a4 -> 7dfe4a748
ORC-310 Improved error handling of compression codecs when being reset. Also
fixes reuse of codecs via DataReader.clone().
Fixes #222
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/7dfe4a74
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/7dfe4a74
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/7dfe4a74
Branch: refs/heads/master
Commit: 7dfe4a7483145be109201f1dbeeffac875ccef1a
Parents: 3d44366
Author: sergey <se...@apache.org>
Authored: Wed Feb 28 16:00:51 2018 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Mar 15 09:37:24 2018 -0700
----------------------------------------------------------------------
.../src/java/org/apache/orc/DataReader.java | 7 +++-
.../java/org/apache/orc/impl/OrcCodecPool.java | 37 ++++++++++-------
.../org/apache/orc/impl/PhysicalFsWriter.java | 10 ++++-
.../org/apache/orc/impl/RecordReaderUtils.java | 42 ++++++++++++--------
4 files changed, 61 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java
index 3155862..ed3acbb 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -79,7 +79,10 @@ public interface DataReader extends AutoCloseable, Cloneable {
@Override
public void close() throws IOException;
- /** Returns the compression codec used by this datareader.
- * @return */
+ /**
+ * Returns the compression codec used by this datareader.
+ * We should consider removing this from the interface.
+ * @return the compression codec
+ */
CompressionCodec getCompressionCodec();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 56b9896..24ee618 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -18,10 +18,8 @@
package org.apache.orc.impl;
import java.util.concurrent.ConcurrentHashMap;
-
import java.util.ArrayList;
import java.util.List;
-
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.slf4j.Logger;
@@ -62,26 +60,35 @@ public final class OrcCodecPool {
return codec;
}
+ /**
+ * Returns the codec to the pool or closes it, suppressing exceptions.
+ * @param kind Compression kind.
+ * @param codec Codec.
+ */
public static void returnCodec(CompressionKind kind, CompressionCodec codec) {
if (codec == null) {
return;
}
assert kind != CompressionKind.NONE;
- codec.reset();
- List<CompressionCodec> list = POOL.get(kind);
- if (list == null) {
- List<CompressionCodec> newList = new ArrayList<>();
- List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
- list = (oldList == null) ? newList : oldList;
- }
- synchronized (list) {
- if (list.size() < MAX_PER_KIND) {
- list.add(codec);
- return;
+ try {
+ codec.reset();
+ List<CompressionCodec> list = POOL.get(kind);
+ if (list == null) {
+ List<CompressionCodec> newList = new ArrayList<>();
+ List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
+ list = (oldList == null) ? newList : oldList;
+ }
+ synchronized (list) {
+ if (list.size() < MAX_PER_KIND) {
+ list.add(codec);
+ return;
+ }
}
+ // We didn't add the codec to the list.
+ codec.close();
+ } catch (Exception ex) {
+ LOG.error("Ignoring codec cleanup error", ex);
}
- // We didn't add the codec to the list.
- codec.close();
}
public static int getPoolSize(CompressionKind kind) {
http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 918fae8..38ca40e 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final HadoopShims shims = HadoopShimsFactory.get();
- private final FSDataOutputStream rawWriter;
+ private FSDataOutputStream rawWriter;
// the compressed metadata information outStream
private OutStream writer = null;
// a protobuf outStream around streamFactory
@@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
private final double paddingTolerance;
private final long defaultStripeSize;
private final CompressionKind compress;
- private final CompressionCodec codec;
+ private CompressionCodec codec;
private final boolean addBlockPadding;
// the streams that make up the current stripe
@@ -225,8 +225,14 @@ public class PhysicalFsWriter implements PhysicalWriter {
@Override
public void close() throws IOException {
+ // We don't use the codec directly but do give it out codec in getCompressionCodec;
+ // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that
+ // would get rid of this pattern require cross-project interface changes, so just return the
+ // codec for now.
OrcCodecPool.returnCodec(compress, codec);
+ codec = null;
rawWriter.close();
+ rawWriter = null;
}
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 1e2d0f1..705e768 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +17,15 @@
*/
package org.apache.orc.impl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +37,6 @@ import org.apache.orc.CompressionKind;
import org.apache.orc.DataReader;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
-
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
@@ -44,6 +45,7 @@ import org.apache.orc.TypeDescription;
*/
public class RecordReaderUtils {
private static final HadoopShims SHIMS = HadoopShimsFactory.get();
+ private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class);
static boolean hadBadBloomFilters(TypeDescription.Category category,
OrcFile.WriterVersion version) {
@@ -143,12 +145,12 @@ public class RecordReaderUtils {
private static class DefaultDataReader implements DataReader {
private FSDataInputStream file = null;
- private final ByteBufferAllocatorPool pool;
+ private ByteBufferAllocatorPool pool;
private HadoopShims.ZeroCopyReaderShim zcr = null;
private final FileSystem fs;
private final Path path;
private final boolean useZeroCopy;
- private final CompressionCodec codec;
+ private CompressionCodec codec;
private final int bufferSize;
private final int typeCount;
private CompressionKind compressionKind;
@@ -161,17 +163,14 @@ public class RecordReaderUtils {
this.codec = OrcCodecPool.getCodec(compressionKind);
this.bufferSize = properties.getBufferSize();
this.typeCount = properties.getTypeCount();
- if (useZeroCopy) {
- this.pool = new ByteBufferAllocatorPool();
- } else {
- this.pool = null;
- }
}
@Override
public void open() throws IOException {
this.file = fs.open(path);
if (useZeroCopy) {
+ // ZCR only uses codec for boolean checks.
+ pool = new ByteBufferAllocatorPool();
zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
} else {
zcr = null;
@@ -231,8 +230,7 @@ public class RecordReaderUtils {
indexes[column] = OrcProto.RowIndex.parseFrom(
InStream.createCodedInputStream("index",
ReaderImpl.singleton(new BufferChunk(bb, 0)),
- stream.getLength(),
- codec, bufferSize));
+ stream.getLength(), codec, bufferSize));
}
break;
case BLOOM_FILTER:
@@ -267,9 +265,9 @@ public class RecordReaderUtils {
// read the footer
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
- ReaderImpl.singleton(new BufferChunk(tailBuf, 0)),
- tailLength, codec, bufferSize));
+ return OrcProto.StripeFooter.parseFrom(
+ InStream.createCodedInputStream("footer", ReaderImpl.singleton(
+ new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
}
@Override
@@ -282,6 +280,7 @@ public class RecordReaderUtils {
public void close() throws IOException {
if (codec != null) {
OrcCodecPool.returnCodec(compressionKind, codec);
+ codec = null;
}
if (pool != null) {
pool.clear();
@@ -290,6 +289,7 @@ public class RecordReaderUtils {
try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
if (file != null) {
file.close();
+ file = null;
}
}
}
@@ -306,8 +306,18 @@ public class RecordReaderUtils {
@Override
public DataReader clone() {
+ if (this.file != null) {
+ // We should really throw here, but that will cause failures in Hive.
+ // While Hive uses clone, just log a warning.
+ LOG.warn("Cloning an opened DataReader; the stream will be reused and closed twice");
+ }
try {
- return (DataReader) super.clone();
+ DefaultDataReader clone = (DefaultDataReader) super.clone();
+ if (codec != null) {
+ // Make sure we don't share the same codec between two readers.
+ clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+ }
+ return clone;
} catch (CloneNotSupportedException e) {
throw new UnsupportedOperationException("uncloneable", e);
}