You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/22 09:11:38 UTC

[3/9] tajo git commit: TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. (jinho)

TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. (jinho)

Closes #303


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cf66a390
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cf66a390
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cf66a390

Branch: refs/heads/index_support
Commit: cf66a390060c79ba757097886703e30f93d31401
Parents: a4c3484
Author: jhkim <jh...@apache.org>
Authored: Mon Dec 22 16:28:41 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Dec 22 16:28:41 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../org/apache/tajo/storage/StorageManager.java | 13 ++++
 .../src/main/resources/storage-default.xml      | 22 ++++++
 .../src/test/resources/storage-default.xml      | 22 ++++++
 .../java/org/apache/tajo/storage/RawFile.java   | 20 +++--
 .../tajo/storage/text/DelimitedLineReader.java  | 14 +++-
 .../tajo/storage/text/DelimitedTextFile.java    | 16 ++--
 .../org/apache/tajo/storage/TestStorages.java   | 77 ++++++++++++++++++++
 .../src/test/resources/storage-default.xml      | 22 ++++++
 9 files changed, 195 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 36cff8a..245918e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -118,6 +118,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. 
+    (jinho)
+
     TAJO-1259: A title in catalog configuration document is different from others. 
     (Jongyoung Park via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 07a51ba..609ca20 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -173,6 +174,18 @@ public abstract class StorageManager {
    */
   public abstract void closeStorageManager();
 
+
+  /**
+   * Clear all class cache
+   */
+  @VisibleForTesting
+  protected synchronized static void clearCache() {
+    CONSTRUCTOR_CACHE.clear();
+    SCANNER_HANDLER_CACHE.clear();
+    APPENDER_HANDLER_CACHE.clear();
+    storageManagers.clear();
+  }
+
   /**
    * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
    * In general Repartitioner determines the partition range using previous output statistics data.

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 67033ed..abea9de 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -195,4 +195,26 @@
     <name>tajo.storage.appender-handler.hfile.class</name>
     <value>org.apache.tajo.storage.hbase.HFileAppender</value>
   </property>
+
+  <!--- Storage buffer -->
+  <property>
+    <name>tajo.storage.text.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.text.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index d1c561b..712f664 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -161,4 +161,26 @@
     <name>tajo.storage.appender-handler.avro.class</name>
     <value>org.apache.tajo.storage.avro.AvroAppender</value>
   </property>
+
+  <!--- Storage buffer -->
+  <property>
+    <name>tajo.storage.text.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.text.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 45e07d3..5213ba0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -46,6 +46,9 @@ import java.nio.channels.FileChannel;
 
 public class RawFile {
   private static final Log LOG = LogFactory.getLog(RawFile.class);
+  public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes";
+  public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes";
+  public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
 
   public static class RawFileScanner extends FileScanner implements SeekableScanner {
     private FileChannel channel;
@@ -92,7 +95,7 @@ public class RawFile {
             + ", fragment length :" + fragment.getLength());
       }
 
-      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
       buffer = buf.nioBuffer(0, buf.capacity());
 
       columnTypes = new DataType[schema.size()];
@@ -382,7 +385,7 @@ public class RawFile {
       if (buffer.capacity() - buffer.remaining()  <  writableBytes) {
         buf.setIndex(buffer.position(), buffer.limit());
         buf.markReaderIndex();
-        buf.discardSomeReadBytes();
+        buf.discardReadBytes();
         buf.ensureWritable(writableBytes);
         buffer = buf.nioBuffer(0, buf.capacity());
         buffer.limit(buf.writerIndex());
@@ -491,7 +494,7 @@ public class RawFile {
         columnTypes[i] = schema.getColumn(i).getDataType();
       }
 
-      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
       buffer = buf.nioBuffer(0, buf.capacity());
 
       // comput the number of bytes, representing the null flags
@@ -532,6 +535,13 @@ public class RawFile {
         buffer.limit(limit);
         buffer.compact();
 
+        //increase the write-buffer
+        if(buffer.remaining() < sizeToBeWritten) {
+          buf.setIndex(buffer.position(), buffer.limit());
+          buf.ensureWritable(sizeToBeWritten);
+          buffer = buf.nioBuffer(0, buf.capacity());
+          buffer.position(buf.readerIndex());
+        }
         return true;
       } else {
         return false;
@@ -632,8 +642,8 @@ public class RawFile {
           continue;
         }
 
-        // 8 is the maximum bytes size of all types
-        if (flushBufferAndReplace(recordOffset, 8)) {
+        // 10 is the maximum bytes size of all types
+        if (flushBufferAndReplace(recordOffset, 10)) {
           recordOffset = 0;
         }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 1b433b5..8b33858 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -36,6 +36,7 @@ import org.apache.tajo.storage.ByteBufInputChannel;
 import org.apache.tajo.storage.FileScanner;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
 
 import java.io.Closeable;
 import java.io.DataInputStream;
@@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class DelimitedLineReader implements Closeable {
   private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
-  private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
 
   private FileSystem fs;
   private FSDataInputStream fis;
@@ -60,12 +60,18 @@ public class DelimitedLineReader implements Closeable {
   private AtomicInteger lineReadBytes = new AtomicInteger();
   private FileFragment fragment;
   private Configuration conf;
+  private int bufferSize;
 
   public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
+    this(conf, fragment, 128 * StorageUnit.KB);
+  }
+
+  public DelimitedLineReader(Configuration conf, final FileFragment fragment, int bufferSize) throws IOException {
     this.fragment = fragment;
     this.conf = conf;
     this.factory = new CompressionCodecFactory(conf);
     this.codec = factory.getCodec(fragment.getPath());
+    this.bufferSize = bufferSize;
     if (this.codec instanceof SplittableCompressionCodec) {
       throw new NotImplementedException(); // bzip2 does not support multi-thread model
     }
@@ -83,14 +89,16 @@ public class DelimitedLineReader implements Closeable {
       decompressor = CodecPool.getDecompressor(codec);
       is = new DataInputStream(codec.createInputStream(fis, decompressor));
       ByteBufInputChannel channel = new ByteBufInputChannel(is);
-      lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+
+      ByteBuf buf = BufferPool.directBuffer(bufferSize);
+      lineReader = new ByteBufLineReader(channel, buf);
     } else {
       fis.seek(startOffset);
       is = fis;
 
       ByteBufInputChannel channel = new ByteBufInputChannel(is);
       lineReader = new ByteBufLineReader(channel,
-          BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+          BufferPool.directBuffer((int) Math.min(bufferSize, end)));
     }
     eof = false;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 59129d1..15db4c3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -39,6 +39,7 @@ import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.ReflectionUtil;
 
 import java.io.BufferedOutputStream;
@@ -55,6 +56,9 @@ import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXN
 public class DelimitedTextFile {
 
   public static final byte LF = '\n';
+  public static final String READ_BUFFER_SIZE = "tajo.storage.text.io.read-buffer.bytes";
+  public static final String WRITE_BUFFER_SIZE = "tajo.storage.text.io.write-buffer.bytes";
+  public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
 
   private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
 
@@ -105,8 +109,7 @@ public class DelimitedTextFile {
     private CompressionCodecFactory codecFactory;
     private CompressionCodec codec;
     private Path compressedPath;
-    private byte[] nullChars;
-    private int BUFFER_SIZE = 128 * 1024;
+    private int bufferSize;
     private int bufferedBytes = 0;
     private long pos = 0;
 
@@ -165,8 +168,9 @@ public class DelimitedTextFile {
       serializer = getLineSerde().createSerializer(schema, meta);
       serializer.init();
 
+      bufferSize = conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
       if (os == null) {
-        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+        os = new NonSyncByteArrayOutputStream(bufferSize);
       }
 
       os.reset();
@@ -189,7 +193,7 @@ public class DelimitedTextFile {
       bufferedBytes += rowBytes;
 
       // refill buffer if necessary
-      if (bufferedBytes > BUFFER_SIZE) {
+      if (bufferedBytes > bufferSize) {
         flushBuffer();
       }
       // Statistical section
@@ -288,7 +292,7 @@ public class DelimitedTextFile {
                                     final Fragment fragment)
         throws IOException {
       super(conf, schema, meta, fragment);
-      reader = new DelimitedLineReader(conf, this.fragment);
+      reader = new DelimitedLineReader(conf, this.fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
       if (!reader.isCompressed()) {
         splittable = true;
       }
@@ -307,7 +311,7 @@ public class DelimitedTextFile {
         reader.close();
       }
 
-      reader = new DelimitedLineReader(conf, fragment);
+      reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
       reader.init();
       recordCount = 0;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 15998f2..9577e3d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -94,6 +94,20 @@ public class TestStorages {
       "  ]\n" +
       "}\n";
 
+  private static String TEST_MAX_VALUE_AVRO_SCHEMA =
+      "{\n" +
+          "  \"type\": \"record\",\n" +
+          "  \"namespace\": \"org.apache.tajo\",\n" +
+          "  \"name\": \"testMaxValue\",\n" +
+          "  \"fields\": [\n" +
+          "    { \"name\": \"col4\", \"type\": \"float\" },\n" +
+          "    { \"name\": \"col5\", \"type\": \"double\" },\n" +
+          "    { \"name\": \"col1\", \"type\": \"int\" },\n" +
+          "    { \"name\": \"col2\", \"type\": \"int\" },\n" +
+          "    { \"name\": \"col3\", \"type\": \"long\" }\n" +
+          "  ]\n" +
+          "}\n";
+
   private StoreType storeType;
   private boolean splitable;
   private boolean statsable;
@@ -875,4 +889,67 @@ public class TestStorages {
       assertEquals(appender.getStats().getNumRows().longValue(), readRows);
     }
   }
+
+  @Test
+  public void testMaxValue() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.FLOAT4);
+    schema.addColumn("col2", Type.FLOAT8);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA);
+    }
+
+    if (storeType == StoreType.RAW) {
+      StorageManager.clearCache();
+      /* TAJO-1250 reproduce BufferOverflow of RAWFile */
+      int headerSize = 4 + 2 + 1; //Integer record length + Short null-flag length + 1 byte null flags
+      /* max varint32: 5 bytes, max varint64: 10 bytes */
+      int record = 4 + 8 + 2 + 5 + 8; // required size is 27
+      conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
+    }
+
+    FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf);
+    Path tablePath = new Path(testDir, "testMaxValue.data");
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+
+    appender.init();
+
+    Tuple tuple = new VTuple(5);
+    tuple.put(new Datum[]{
+        DatumFactory.createFloat4(Float.MAX_VALUE),
+        DatumFactory.createFloat8(Double.MAX_VALUE),
+        DatumFactory.createInt2(Short.MAX_VALUE),
+        DatumFactory.createInt4(Integer.MAX_VALUE),
+        DatumFactory.createInt8(Long.MAX_VALUE)
+    });
+
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = sm.getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved = scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+    scanner.close();
+
+
+    if (storeType == StoreType.RAW){
+      StorageManager.clearCache();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 737284b..adddf66 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -175,4 +175,26 @@
     <name>tajo.storage.appender-handler.avro.class</name>
     <value>org.apache.tajo.storage.avro.AvroAppender</value>
   </property>
+
+  <!--- Storage buffer -->
+  <property>
+    <name>tajo.storage.text.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.text.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.read-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB read buffer</description>
+  </property>
+  <property>
+    <name>tajo.storage.raw.io.write-buffer.bytes</name>
+    <value>131072</value>
+    <description>128KB write buffer</description>
+  </property>
 </configuration>