You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/22 09:11:38 UTC
[3/9] tajo git commit: TAJO-1250: RawFileAppender occasionally causes
BufferOverflowException. (jinho)
TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. (jinho)
Closes #303
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cf66a390
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cf66a390
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cf66a390
Branch: refs/heads/index_support
Commit: cf66a390060c79ba757097886703e30f93d31401
Parents: a4c3484
Author: jhkim <jh...@apache.org>
Authored: Mon Dec 22 16:28:41 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Dec 22 16:28:41 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/storage/StorageManager.java | 13 ++++
.../src/main/resources/storage-default.xml | 22 ++++++
.../src/test/resources/storage-default.xml | 22 ++++++
.../java/org/apache/tajo/storage/RawFile.java | 20 +++--
.../tajo/storage/text/DelimitedLineReader.java | 14 +++-
.../tajo/storage/text/DelimitedTextFile.java | 16 ++--
.../org/apache/tajo/storage/TestStorages.java | 77 ++++++++++++++++++++
.../src/test/resources/storage-default.xml | 22 ++++++
9 files changed, 195 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 36cff8a..245918e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -118,6 +118,9 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1250: RawFileAppender occasionally causes BufferOverflowException.
+ (jinho)
+
TAJO-1259: A title in catalog configuration document is different from others.
(Jongyoung Park via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 07a51ba..609ca20 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -173,6 +174,18 @@ public abstract class StorageManager {
*/
public abstract void closeStorageManager();
+
+ /**
+ * Clear all class cache
+ */
+ @VisibleForTesting
+ protected synchronized static void clearCache() {
+ CONSTRUCTOR_CACHE.clear();
+ SCANNER_HANDLER_CACHE.clear();
+ APPENDER_HANDLER_CACHE.clear();
+ storageManagers.clear();
+ }
+
/**
* It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
* In general Repartitioner determines the partition range using previous output statistics data.
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 67033ed..abea9de 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -195,4 +195,26 @@
<name>tajo.storage.appender-handler.hfile.class</name>
<value>org.apache.tajo.storage.hbase.HFileAppender</value>
</property>
+
+ <!--- Storage buffer -->
+ <property>
+ <name>tajo.storage.text.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.text.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index d1c561b..712f664 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -161,4 +161,26 @@
<name>tajo.storage.appender-handler.avro.class</name>
<value>org.apache.tajo.storage.avro.AvroAppender</value>
</property>
+
+ <!--- Storage buffer -->
+ <property>
+ <name>tajo.storage.text.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.text.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 45e07d3..5213ba0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -46,6 +46,9 @@ import java.nio.channels.FileChannel;
public class RawFile {
private static final Log LOG = LogFactory.getLog(RawFile.class);
+ public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes";
+ public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes";
+ public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
public static class RawFileScanner extends FileScanner implements SeekableScanner {
private FileChannel channel;
@@ -92,7 +95,7 @@ public class RawFile {
+ ", fragment length :" + fragment.getLength());
}
- buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+ buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
buffer = buf.nioBuffer(0, buf.capacity());
columnTypes = new DataType[schema.size()];
@@ -382,7 +385,7 @@ public class RawFile {
if (buffer.capacity() - buffer.remaining() < writableBytes) {
buf.setIndex(buffer.position(), buffer.limit());
buf.markReaderIndex();
- buf.discardSomeReadBytes();
+ buf.discardReadBytes();
buf.ensureWritable(writableBytes);
buffer = buf.nioBuffer(0, buf.capacity());
buffer.limit(buf.writerIndex());
@@ -491,7 +494,7 @@ public class RawFile {
columnTypes[i] = schema.getColumn(i).getDataType();
}
- buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+ buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
buffer = buf.nioBuffer(0, buf.capacity());
// comput the number of bytes, representing the null flags
@@ -532,6 +535,13 @@ public class RawFile {
buffer.limit(limit);
buffer.compact();
+ //increase the write-buffer
+ if(buffer.remaining() < sizeToBeWritten) {
+ buf.setIndex(buffer.position(), buffer.limit());
+ buf.ensureWritable(sizeToBeWritten);
+ buffer = buf.nioBuffer(0, buf.capacity());
+ buffer.position(buf.readerIndex());
+ }
return true;
} else {
return false;
@@ -632,8 +642,8 @@ public class RawFile {
continue;
}
- // 8 is the maximum bytes size of all types
- if (flushBufferAndReplace(recordOffset, 8)) {
+ // 10 is the maximum bytes size of all types
+ if (flushBufferAndReplace(recordOffset, 10)) {
recordOffset = 0;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 1b433b5..8b33858 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -36,6 +36,7 @@ import org.apache.tajo.storage.ByteBufInputChannel;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
import java.io.Closeable;
import java.io.DataInputStream;
@@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DelimitedLineReader implements Closeable {
private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
- private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
private FileSystem fs;
private FSDataInputStream fis;
@@ -60,12 +60,18 @@ public class DelimitedLineReader implements Closeable {
private AtomicInteger lineReadBytes = new AtomicInteger();
private FileFragment fragment;
private Configuration conf;
+ private int bufferSize;
public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
+ this(conf, fragment, 128 * StorageUnit.KB);
+ }
+
+ public DelimitedLineReader(Configuration conf, final FileFragment fragment, int bufferSize) throws IOException {
this.fragment = fragment;
this.conf = conf;
this.factory = new CompressionCodecFactory(conf);
this.codec = factory.getCodec(fragment.getPath());
+ this.bufferSize = bufferSize;
if (this.codec instanceof SplittableCompressionCodec) {
throw new NotImplementedException(); // bzip2 does not support multi-thread model
}
@@ -83,14 +89,16 @@ public class DelimitedLineReader implements Closeable {
decompressor = CodecPool.getDecompressor(codec);
is = new DataInputStream(codec.createInputStream(fis, decompressor));
ByteBufInputChannel channel = new ByteBufInputChannel(is);
- lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+
+ ByteBuf buf = BufferPool.directBuffer(bufferSize);
+ lineReader = new ByteBufLineReader(channel, buf);
} else {
fis.seek(startOffset);
is = fis;
ByteBufInputChannel channel = new ByteBufInputChannel(is);
lineReader = new ByteBufLineReader(channel,
- BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+ BufferPool.directBuffer((int) Math.min(bufferSize, end)));
}
eof = false;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 59129d1..15db4c3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -39,6 +39,7 @@ import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.ReflectionUtil;
import java.io.BufferedOutputStream;
@@ -55,6 +56,9 @@ import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXN
public class DelimitedTextFile {
public static final byte LF = '\n';
+ public static final String READ_BUFFER_SIZE = "tajo.storage.text.io.read-buffer.bytes";
+ public static final String WRITE_BUFFER_SIZE = "tajo.storage.text.io.write-buffer.bytes";
+ public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
@@ -105,8 +109,7 @@ public class DelimitedTextFile {
private CompressionCodecFactory codecFactory;
private CompressionCodec codec;
private Path compressedPath;
- private byte[] nullChars;
- private int BUFFER_SIZE = 128 * 1024;
+ private int bufferSize;
private int bufferedBytes = 0;
private long pos = 0;
@@ -165,8 +168,9 @@ public class DelimitedTextFile {
serializer = getLineSerde().createSerializer(schema, meta);
serializer.init();
+ bufferSize = conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
if (os == null) {
- os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ os = new NonSyncByteArrayOutputStream(bufferSize);
}
os.reset();
@@ -189,7 +193,7 @@ public class DelimitedTextFile {
bufferedBytes += rowBytes;
// refill buffer if necessary
- if (bufferedBytes > BUFFER_SIZE) {
+ if (bufferedBytes > bufferSize) {
flushBuffer();
}
// Statistical section
@@ -288,7 +292,7 @@ public class DelimitedTextFile {
final Fragment fragment)
throws IOException {
super(conf, schema, meta, fragment);
- reader = new DelimitedLineReader(conf, this.fragment);
+ reader = new DelimitedLineReader(conf, this.fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
if (!reader.isCompressed()) {
splittable = true;
}
@@ -307,7 +311,7 @@ public class DelimitedTextFile {
reader.close();
}
- reader = new DelimitedLineReader(conf, fragment);
+ reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
reader.init();
recordCount = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 15998f2..9577e3d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -94,6 +94,20 @@ public class TestStorages {
" ]\n" +
"}\n";
+ private static String TEST_MAX_VALUE_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testMaxValue\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"col4\", \"type\": \"float\" },\n" +
+ " { \"name\": \"col5\", \"type\": \"double\" },\n" +
+ " { \"name\": \"col1\", \"type\": \"int\" },\n" +
+ " { \"name\": \"col2\", \"type\": \"int\" },\n" +
+ " { \"name\": \"col3\", \"type\": \"long\" }\n" +
+ " ]\n" +
+ "}\n";
+
private StoreType storeType;
private boolean splitable;
private boolean statsable;
@@ -875,4 +889,67 @@ public class TestStorages {
assertEquals(appender.getStats().getNumRows().longValue(), readRows);
}
}
+
+ @Test
+ public void testMaxValue() throws IOException {
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.FLOAT4);
+ schema.addColumn("col2", Type.FLOAT8);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA);
+ }
+
+ if (storeType == StoreType.RAW) {
+ StorageManager.clearCache();
+ /* TAJO-1250 reproduce BufferOverflow of RAWFile */
+ int headerSize = 4 + 2 + 1; //Integer record length + Short null-flag length + 1 byte null flags
+ /* max varint32: 5 bytes, max varint64: 10 bytes */
+ int record = 4 + 8 + 2 + 5 + 8; // required size is 27
+ conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
+ }
+
+ FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf);
+ Path tablePath = new Path(testDir, "testMaxValue.data");
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+
+ appender.init();
+
+ Tuple tuple = new VTuple(5);
+ tuple.put(new Datum[]{
+ DatumFactory.createFloat4(Float.MAX_VALUE),
+ DatumFactory.createFloat8(Double.MAX_VALUE),
+ DatumFactory.createInt2(Short.MAX_VALUE),
+ DatumFactory.createInt4(Integer.MAX_VALUE),
+ DatumFactory.createInt8(Long.MAX_VALUE)
+ });
+
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = sm.getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+
+
+ if (storeType == StoreType.RAW){
+ StorageManager.clearCache();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 737284b..adddf66 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -175,4 +175,26 @@
<name>tajo.storage.appender-handler.avro.class</name>
<value>org.apache.tajo.storage.avro.AvroAppender</value>
</property>
+
+ <!--- Storage buffer -->
+ <property>
+ <name>tajo.storage.text.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.text.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.read-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB read buffer</description>
+ </property>
+ <property>
+ <name>tajo.storage.raw.io.write-buffer.bytes</name>
+ <value>131072</value>
+ <description>128KB write buffer</description>
+ </property>
</configuration>