You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/05 06:22:56 UTC
git commit: TAJO-653: RCFileAppender throws IOException. (jinho)
Repository: incubator-tajo
Updated Branches:
refs/heads/branch-0.8.0 16064ca44 -> f4aad2bf3
TAJO-653: RCFileAppender throws IOException. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f4aad2bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f4aad2bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f4aad2bf
Branch: refs/heads/branch-0.8.0
Commit: f4aad2bf3e28dbde765bdab562157212a51d71ab
Parents: 16064ca
Author: jinossy <ji...@gmail.com>
Authored: Wed Mar 5 14:22:02 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Mar 5 14:22:02 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../org/apache/tajo/storage/rcfile/RCFile.java | 19 ++++++++++---------
2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f4aad2bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b263460..de81ec8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -269,6 +269,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-653: RCFileAppender throws IOException. (jinho)
+
TAJO-641: NPE in HCatalogStore.addTable(). (jaehwa)
TAJO-646: TajoClient is blocked while main thread finished.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f4aad2bf/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index afbcaa8..90d7f48 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -41,6 +41,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.Bytes;
import java.io.*;
+import java.io.Closeable;
import java.rmi.server.UID;
import java.security.MessageDigest;
import java.util.Arrays;
@@ -436,7 +437,7 @@ public class RCFile {
* column_2_row_2_value,....]</li>
* </ul>
*/
- public static class ValueBuffer {
+ public static class ValueBuffer implements Closeable{
// used to load columns' value into memory
private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
@@ -533,6 +534,7 @@ public class RCFile {
decompressBuffer.reset();
}
+ @Override
public void close() {
for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
IOUtils.closeStream(element);
@@ -581,7 +583,7 @@ public class RCFile {
// how many records the writer buffers before it writes to disk
private int RECORD_INTERVAL = Integer.MAX_VALUE;
// the max size of memory for buffering records before writes them out
- private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 4M
+ private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 16M
// the conf string for COLUMNS_BUFFER_SIZE
public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
@@ -963,7 +965,7 @@ public class RCFile {
valueBuffer = new NonSyncByteArrayOutputStream();
deflateFilter = codec.createOutputStream(valueBuffer, compressor);
- deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ deflateOut = new DataOutputStream(deflateFilter);
}
try {
@@ -989,7 +991,7 @@ public class RCFile {
valueLength += colLen;
}
} catch (IOException e) {
- IOUtils.cleanup(LOG, deflateOut);
+ IOUtils.cleanup(LOG, deflateOut, out);
throw e;
}
@@ -1008,7 +1010,7 @@ public class RCFile {
try {
out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
} finally {
- IOUtils.cleanup(LOG, valueBuffer, deflateOut, deflateFilter);
+ IOUtils.cleanup(LOG, valueBuffer);
}
} else {
for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
@@ -1038,7 +1040,7 @@ public class RCFile {
NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
- DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
//compress key and write key out
compressionBuffer.reset();
@@ -1083,7 +1085,7 @@ public class RCFile {
if (out != null) {
// Close the underlying stream if we own it...
out.flush();
- out.close();
+ IOUtils.cleanup(LOG, out);
out = null;
}
}
@@ -1727,8 +1729,7 @@ public class RCFile {
@Override
public void close() throws IOException {
- IOUtils.closeStream(in);
- currentValue.close();
+ IOUtils.cleanup(LOG, in, currentValue);
if (keyDecompressor != null) {
// Make sure we only return decompressor once.
org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);