You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/05 06:22:56 UTC

git commit: TAJO-653: RCFileAppender throws IOException. (jinho)

Repository: incubator-tajo
Updated Branches:
  refs/heads/branch-0.8.0 16064ca44 -> f4aad2bf3


TAJO-653: RCFileAppender throws IOException. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f4aad2bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f4aad2bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f4aad2bf

Branch: refs/heads/branch-0.8.0
Commit: f4aad2bf3e28dbde765bdab562157212a51d71ab
Parents: 16064ca
Author: jinossy <ji...@gmail.com>
Authored: Wed Mar 5 14:22:02 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Mar 5 14:22:02 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                      |  2 ++
 .../org/apache/tajo/storage/rcfile/RCFile.java   | 19 ++++++++++---------
 2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f4aad2bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b263460..de81ec8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -269,6 +269,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-653: RCFileAppender throws IOException. (jinho)
+
     TAJO-641: NPE in HCatalogStore.addTable(). (jaehwa)
 
     TAJO-646: TajoClient is blocked while main thread finished. 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f4aad2bf/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index afbcaa8..90d7f48 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -41,6 +41,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.Bytes;
 
 import java.io.*;
+import java.io.Closeable;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import java.util.Arrays;
@@ -436,7 +437,7 @@ public class RCFile {
    * column_2_row_2_value,....]</li>
    * </ul>
    */
-  public static class ValueBuffer {
+  public static class ValueBuffer implements Closeable{
 
     // used to load columns' value into memory
     private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
@@ -533,6 +534,7 @@ public class RCFile {
       decompressBuffer.reset();
     }
 
+    @Override
     public void close() {
       for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
         IOUtils.closeStream(element);
@@ -581,7 +583,7 @@ public class RCFile {
     // how many records the writer buffers before it writes to disk
     private int RECORD_INTERVAL = Integer.MAX_VALUE;
     // the max size of memory for buffering records before writes them out
-    private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 4M
+    private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 16M
     // the conf string for COLUMNS_BUFFER_SIZE
     public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
 
@@ -963,7 +965,7 @@ public class RCFile {
 
         valueBuffer = new NonSyncByteArrayOutputStream();
         deflateFilter = codec.createOutputStream(valueBuffer, compressor);
-        deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+        deflateOut = new DataOutputStream(deflateFilter);
       }
 
       try {
@@ -989,7 +991,7 @@ public class RCFile {
           valueLength += colLen;
         }
       } catch (IOException e) {
-        IOUtils.cleanup(LOG, deflateOut);
+        IOUtils.cleanup(LOG, deflateOut, out);
         throw e;
       }
 
@@ -1008,7 +1010,7 @@ public class RCFile {
         try {
           out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
         } finally {
-          IOUtils.cleanup(LOG, valueBuffer, deflateOut, deflateFilter);
+          IOUtils.cleanup(LOG, valueBuffer);
         }
       } else {
         for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
@@ -1038,7 +1040,7 @@ public class RCFile {
 
         NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
         CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
-        DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+        DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
 
         //compress key and write key out
         compressionBuffer.reset();
@@ -1083,7 +1085,7 @@ public class RCFile {
       if (out != null) {
         // Close the underlying stream if we own it...
         out.flush();
-        out.close();
+        IOUtils.cleanup(LOG, out);
         out = null;
       }
     }
@@ -1727,8 +1729,7 @@ public class RCFile {
 
     @Override
     public void close() throws IOException {
-      IOUtils.closeStream(in);
-      currentValue.close();
+      IOUtils.cleanup(LOG, in, currentValue);
       if (keyDecompressor != null) {
         // Make sure we only return decompressor once.
         org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);