You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:41 UTC
[27/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
deleted file mode 100644
index 2f742c6..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.ByteBufInputChannel;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ByteBufLineReader implements Closeable {
- private static int DEFAULT_BUFFER = 64 * 1024;
-
- private int bufferSize;
- private long readBytes;
- private int startIndex;
- private boolean eof = false;
- private ByteBuf buffer;
- private final ByteBufInputChannel channel;
- private final AtomicInteger lineReadBytes = new AtomicInteger();
- private final LineSplitProcessor processor = new LineSplitProcessor();
-
- public ByteBufLineReader(ByteBufInputChannel channel) {
- this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
- }
-
- public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
- this.readBytes = 0;
- this.channel = channel;
- this.buffer = buf;
- this.bufferSize = buf.capacity();
- }
-
- public long readBytes() {
- return readBytes - buffer.readableBytes();
- }
-
- @Override
- public void close() throws IOException {
- if (this.buffer.refCnt() > 0) {
- this.buffer.release();
- }
- this.channel.close();
- }
-
- public String readLine() throws IOException {
- ByteBuf buf = readLineBuf(lineReadBytes);
- if (buf != null) {
- return buf.toString(CharsetUtil.UTF_8);
- }
- return null;
- }
-
- private void fillBuffer() throws IOException {
-
- int tailBytes = 0;
- if (this.readBytes > 0) {
- //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes)
- this.buffer.markReaderIndex();
- this.buffer.discardReadBytes(); // compact the buffer
- tailBytes = this.buffer.writerIndex();
- if (!this.buffer.isWritable()) {
- // a line bytes is large than the buffer
- BufferPool.ensureWritable(buffer, bufferSize * 2);
- this.bufferSize = buffer.capacity();
- }
- this.startIndex = 0;
- }
-
- boolean release = true;
- try {
- int readBytes = tailBytes;
- for (; ; ) {
- int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes);
- if (localReadBytes < 0) {
- if (buffer.isWritable()) {
- //if read bytes is less than the buffer capacity, there is no more bytes in the channel
- eof = true;
- }
- break;
- }
- readBytes += localReadBytes;
- if (readBytes == bufferSize) {
- break;
- }
- }
- this.readBytes += (readBytes - tailBytes);
- release = false;
-
- this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
- } finally {
- if (release) {
- buffer.release();
- }
- }
- }
-
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
- public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
- int readBytes = 0; // newline + text line bytes
- int newlineLength = 0; //length of terminating newline
- int readable;
-
- this.startIndex = buffer.readerIndex();
-
- loop:
- while (true) {
- readable = buffer.readableBytes();
- if (readable <= 0) {
- buffer.readerIndex(this.startIndex);
- fillBuffer(); //compact and fill buffer
-
- //if buffer.writerIndex() is zero, there is no bytes in buffer
- if (!buffer.isReadable() && buffer.writerIndex() == 0) {
- reads.set(0);
- return null;
- } else {
- //skip first newLine
- if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
- buffer.skipBytes(1);
- if(eof && !buffer.isReadable()) {
- reads.set(1);
- return null;
- }
-
- newlineLength++;
- readBytes++;
- startIndex = buffer.readerIndex();
- }
- }
- readable = buffer.readableBytes();
- }
-
- int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
- if (endIndex < 0) {
- //does not appeared terminating newline
- buffer.readerIndex(buffer.writerIndex()); // set to end buffer
- if(eof){
- readBytes += (buffer.readerIndex() - startIndex);
- break loop;
- }
- } else {
- buffer.readerIndex(endIndex + 1);
- readBytes += (buffer.readerIndex() - startIndex); //past newline + text line
-
- //appeared terminating CRLF
- if (processor.isPrevCharCR() && buffer.isReadable()
- && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
- buffer.skipBytes(1);
- readBytes++;
- newlineLength += 2;
- } else {
- newlineLength += 1;
- }
- break loop;
- }
- }
- reads.set(readBytes);
- return buffer.slice(startIndex, readBytes - newlineLength);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
deleted file mode 100644
index 1599f62..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public class CSVLineDeserializer extends TextLineDeserializer {
- private FieldSplitProcessor processor;
- private FieldSerializerDeserializer fieldSerDer;
- private ByteBuf nullChars;
-
- public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
- super(schema, meta, targetColumnIndexes);
- }
-
- @Override
- public void init() {
- this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta));
-
- if (nullChars != null) {
- nullChars.release();
- }
- nullChars = TextLineSerDe.getNullChars(meta);
-
- fieldSerDer = new TextFieldSerializerDeserializer(meta);
- }
-
- public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
- int[] projection = targetColumnIndexes;
- if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
- return;
- }
-
- final int rowLength = lineBuf.readableBytes();
- int start = 0, fieldLength = 0, end = 0;
-
- //Projection
- int currentTarget = 0;
- int currentIndex = 0;
-
- while (end != -1) {
- end = lineBuf.forEachByte(start, rowLength - start, processor);
-
- if (end < 0) {
- fieldLength = rowLength - start;
- } else {
- fieldLength = end - start;
- }
-
- if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
- lineBuf.setIndex(start, start + fieldLength);
- Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
- output.put(currentIndex, datum);
- currentTarget++;
- }
-
- if (projection.length == currentTarget) {
- break;
- }
-
- start = end + 1;
- currentIndex++;
- }
- }
-
- @Override
- public void release() {
- if (nullChars != null) {
- nullChars.release();
- nullChars = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
deleted file mode 100644
index 2fe7f23..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.StorageConstants;
-
-public class CSVLineSerDe extends TextLineSerDe {
- @Override
- public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
- return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
- }
-
- @Override
- public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
- return new CSVLineSerializer(schema, meta);
- }
-
- public static char getFieldDelimiter(TableMeta meta) {
- return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER,
- StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
deleted file mode 100644
index c0fc18f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class CSVLineSerializer extends TextLineSerializer {
- private FieldSerializerDeserializer serde;
-
- private byte [] nullChars;
- private char delimiter;
- private int columnNum;
-
- public CSVLineSerializer(Schema schema, TableMeta meta) {
- super(schema, meta);
- }
-
- @Override
- public void init() {
- nullChars = TextLineSerDe.getNullCharsAsBytes(meta);
- delimiter = CSVLineSerDe.getFieldDelimiter(meta);
- columnNum = schema.size();
-
- serde = new TextFieldSerializerDeserializer(meta);
- }
-
- @Override
- public int serialize(OutputStream out, Tuple input) throws IOException {
- int writtenBytes = 0;
-
- for (int i = 0; i < columnNum; i++) {
- Datum datum = input.get(i);
- writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars);
-
- if (columnNum - 1 > i) {
- out.write((byte) delimiter);
- writtenBytes += 1;
- }
- }
-
- return writtenBytes;
- }
-
- @Override
- public void release() {
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
deleted file mode 100644
index 0efe030..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.tajo.common.exception.NotImplementedException;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.ByteBufInputChannel;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DelimitedLineReader implements Closeable {
- private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
- private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
-
- private FileSystem fs;
- private FSDataInputStream fis;
- private InputStream is; //decompressd stream
- private CompressionCodecFactory factory;
- private CompressionCodec codec;
- private Decompressor decompressor;
-
- private long startOffset, end, pos;
- private boolean eof = true;
- private ByteBufLineReader lineReader;
- private AtomicInteger lineReadBytes = new AtomicInteger();
- private FileFragment fragment;
- private Configuration conf;
-
- public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
- this.fragment = fragment;
- this.conf = conf;
- this.factory = new CompressionCodecFactory(conf);
- this.codec = factory.getCodec(fragment.getPath());
- if (this.codec instanceof SplittableCompressionCodec) {
- throw new NotImplementedException(); // bzip2 does not support multi-thread model
- }
- }
-
- public void init() throws IOException {
- if (fs == null) {
- fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
- }
- if (fis == null) fis = fs.open(fragment.getPath());
- pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getEndKey();
-
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- is = new DataInputStream(codec.createInputStream(fis, decompressor));
- ByteBufInputChannel channel = new ByteBufInputChannel(is);
- lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
- } else {
- fis.seek(startOffset);
- is = fis;
-
- ByteBufInputChannel channel = new ByteBufInputChannel(is);
- lineReader = new ByteBufLineReader(channel,
- BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
- }
- eof = false;
- }
-
- public long getCompressedPosition() throws IOException {
- long retVal;
- if (isCompressed()) {
- retVal = fis.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
-
- public long getUnCompressedPosition() throws IOException {
- return pos;
- }
-
- public long getReadBytes() {
- return pos - startOffset;
- }
-
- public boolean isReadable() {
- return !eof;
- }
-
- public ByteBuf readLine() throws IOException {
- if (eof) {
- return null;
- }
-
- ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
- pos += lineReadBytes.get();
- if (buf == null) {
- eof = true;
- }
-
- if (!isCompressed() && getCompressedPosition() > end) {
- eof = true;
- }
- return buf;
- }
-
- public boolean isCompressed() {
- return codec != null;
- }
-
- @Override
- public void close() throws IOException {
- try {
- IOUtils.cleanup(LOG, lineReader, is, fis);
- fs = null;
- is = null;
- fis = null;
- lineReader = null;
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
deleted file mode 100644
index ab8a0b5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.ReflectionUtil;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
-import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
-
-public class DelimitedTextFile {
-
- public static final byte LF = '\n';
- public static int EOF = -1;
-
- private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
-
- /** it caches line serde classes. */
- private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache =
- new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
-
- /**
- * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given,
- * it will use the specified serder class.
- *
- * @return TextLineSerder
- */
- public static TextLineSerDe getLineSerde(TableMeta meta) {
- TextLineSerDe lineSerder;
-
- String serDeClassName;
-
- // if there is no given serde class, it will use CSV line serder.
- serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
-
- try {
- Class<? extends TextLineSerDe> serdeClass;
-
- if (serdeClassCache.containsKey(serDeClassName)) {
- serdeClass = serdeClassCache.get(serDeClassName);
- } else {
- serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
- serdeClassCache.put(serDeClassName, serdeClass);
- }
- lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
- } catch (Throwable e) {
- throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
- }
-
- return lineSerder;
- }
-
- public static class DelimitedTextFileAppender extends FileAppender {
- private final TableMeta meta;
- private final Schema schema;
- private final FileSystem fs;
- private FSDataOutputStream fos;
- private DataOutputStream outputStream;
- private CompressionOutputStream deflateFilter;
- private TableStatistics stats = null;
- private Compressor compressor;
- private CompressionCodecFactory codecFactory;
- private CompressionCodec codec;
- private Path compressedPath;
- private byte[] nullChars;
- private int BUFFER_SIZE = 128 * 1024;
- private int bufferedBytes = 0;
- private long pos = 0;
-
- private NonSyncByteArrayOutputStream os;
- private TextLineSerializer serializer;
-
- public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
- throws IOException {
- super(conf, schema, meta, path);
- this.fs = path.getFileSystem(conf);
- this.meta = meta;
- this.schema = schema;
- }
-
- public TextLineSerDe getLineSerde() {
- return DelimitedTextFile.getLineSerde(meta);
- }
-
- @Override
- public void init() throws IOException {
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
- String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
- codecFactory = new CompressionCodecFactory(conf);
- codec = codecFactory.getCodecByClassName(codecName);
- compressor = CodecPool.getCompressor(codec);
- if (compressor != null) compressor.reset(); //builtin gzip is null
-
- String extension = codec.getDefaultExtension();
- compressedPath = path.suffix(extension);
-
- if (fs.exists(compressedPath)) {
- throw new AlreadyExistsStorageException(compressedPath);
- }
-
- fos = fs.create(compressedPath);
- deflateFilter = codec.createOutputStream(fos, compressor);
- outputStream = new DataOutputStream(deflateFilter);
-
- } else {
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
- fos = fs.create(path);
- outputStream = new DataOutputStream(new BufferedOutputStream(fos));
- }
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- serializer = getLineSerde().createSerializer(schema, meta);
- serializer.init();
-
- if (os == null) {
- os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
- }
-
- os.reset();
- pos = fos.getPos();
- bufferedBytes = 0;
- super.init();
- }
-
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- // write
- int rowBytes = serializer.serialize(os, tuple);
-
- // new line
- os.write(LF);
- rowBytes += 1;
-
- // update positions
- pos += rowBytes;
- bufferedBytes += rowBytes;
-
- // refill buffer if necessary
- if (bufferedBytes > BUFFER_SIZE) {
- flushBuffer();
- }
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- private void flushBuffer() throws IOException {
- if (os.getLength() > 0) {
- os.writeTo(outputStream);
- os.reset();
- bufferedBytes = 0;
- }
- }
-
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- @Override
- public void flush() throws IOException {
- flushBuffer();
- outputStream.flush();
- }
-
- @Override
- public void close() throws IOException {
-
- try {
- serializer.release();
-
- if(outputStream != null){
- flush();
- }
-
- // Statistical section
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
-
- if (deflateFilter != null) {
- deflateFilter.finish();
- deflateFilter.resetState();
- deflateFilter = null;
- }
-
- os.close();
- } finally {
- IOUtils.cleanup(LOG, fos);
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-
- public boolean isCompress() {
- return compressor != null;
- }
-
- public String getExtension() {
- return codec != null ? codec.getDefaultExtension() : "";
- }
- }
-
- public static class DelimitedTextFileScanner extends FileScanner {
- private boolean splittable = false;
- private final long startOffset;
-
- private final long endOffset;
- /** The number of actual read records */
- private int recordCount = 0;
- private int[] targetColumnIndexes;
-
- private DelimitedLineReader reader;
- private TextLineDeserializer deserializer;
-
- private int errorPrintOutMaxNum = 5;
- /** Maximum number of permissible errors */
- private int errorTorrenceMaxNum;
- /** How many errors have occurred? */
- private int errorNum;
-
- public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
- final FileFragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
- reader = new DelimitedLineReader(conf, fragment);
- if (!reader.isCompressed()) {
- splittable = true;
- }
-
- startOffset = fragment.getStartKey();
- endOffset = startOffset + fragment.getEndKey();
-
- errorTorrenceMaxNum =
- Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
- }
-
- @Override
- public void init() throws IOException {
- if (reader != null) {
- reader.close();
- }
-
- reader = new DelimitedLineReader(conf, fragment);
- reader.init();
- recordCount = 0;
-
- if (targets == null) {
- targets = schema.toArray();
- }
-
- targetColumnIndexes = new int[targets.length];
- for (int i = 0; i < targets.length; i++) {
- targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
-
- super.init();
- Arrays.sort(targetColumnIndexes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
- }
-
- if (startOffset > 0) {
- reader.readLine(); // skip first line;
- }
-
- deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
- deserializer.init();
- }
-
- public TextLineSerDe getLineSerde() {
- return DelimitedTextFile.getLineSerde(meta);
- }
-
- @Override
- public float getProgress() {
- try {
- if (!reader.isReadable()) {
- return 1.0f;
- }
- long filePos = reader.getCompressedPosition();
- if (startOffset == filePos) {
- return 0.0f;
- } else {
- long readBytes = filePos - startOffset;
- long remainingBytes = Math.max(endOffset - filePos, 0);
- return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return 0.0f;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- VTuple tuple;
-
- if (!reader.isReadable()) {
- return null;
- }
-
- try {
-
- // this loop will continue until one tuple is build or EOS (end of stream).
- do {
-
- ByteBuf buf = reader.readLine();
-
- // if no more line, then return EOT (end of tuple)
- if (buf == null) {
- return null;
- }
-
- // If there is no required column, we just read each line
- // and then return an empty tuple without parsing line.
- if (targets.length == 0) {
- recordCount++;
- return EmptyTuple.get();
- }
-
- tuple = new VTuple(schema.size());
-
- try {
- deserializer.deserialize(buf, tuple);
- // if a line is read normaly, it exists this loop.
- break;
-
- } catch (TextLineParsingError tae) {
-
- errorNum++;
-
- // suppress too many log prints, which probably cause performance degradation
- if (errorNum < errorPrintOutMaxNum) {
- LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
- }
-
- // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0),
- // it checks if the number of parsing error exceeds the max limit.
- // Otherwise, it will ignore all parsing errors.
- if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
- throw tae;
- }
- continue;
- }
-
- } while (reader.isReadable()); // continue until EOS
-
- // recordCount means the number of actual read records. We increment the count here.
- recordCount++;
-
- return tuple;
-
- } catch (Throwable t) {
- LOG.error(t);
- throw new IOException(t);
- }
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (deserializer != null) {
- deserializer.release();
- }
-
- if (tableStats != null && reader != null) {
- tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
- tableStats.setNumRows(recordCount);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
- }
- } finally {
- IOUtils.cleanup(LOG, reader);
- reader = null;
- }
- }
-
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public boolean isSplittable() {
- return splittable;
- }
-
- @Override
- public TableStats getInputStats() {
- if (tableStats != null && reader != null) {
- tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
- tableStats.setNumRows(recordCount);
- tableStats.setNumBytes(fragment.getEndKey());
- }
- return tableStats;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
deleted file mode 100644
index a5ac142..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBufProcessor;
-
-public class FieldSplitProcessor implements ByteBufProcessor {
- private char delimiter; //the ascii separate character
-
- public FieldSplitProcessor(char recordDelimiterByte) {
- this.delimiter = recordDelimiterByte;
- }
-
- @Override
- public boolean process(byte value) throws Exception {
- return delimiter != value;
- }
-
- public char getDelimiter() {
- return delimiter;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
deleted file mode 100644
index a130527..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBufProcessor;
-
-public class LineSplitProcessor implements ByteBufProcessor {
- public static final byte CR = '\r';
- public static final byte LF = '\n';
- private boolean prevCharCR = false; //true of prev char was CR
-
- @Override
- public boolean process(byte value) throws Exception {
- switch (value) {
- case LF:
- return false;
- case CR:
- prevCharCR = true;
- return false;
- default:
- prevCharCR = false;
- return true;
- }
- }
-
- public boolean isPrevCharCR() {
- return prevCharCR;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
deleted file mode 100644
index ae7565d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import com.google.protobuf.Message;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.CharsetUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.storage.FieldSerializerDeserializer;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.util.NumberUtil;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.CharsetDecoder;
-import java.util.TimeZone;
-
-public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
- public static final byte[] trueBytes = "true".getBytes();
- public static final byte[] falseBytes = "false".getBytes();
- private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
- private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
-
- private final boolean hasTimezone;
- private final TimeZone timezone;
-
- public TextFieldSerializerDeserializer(TableMeta meta) {
- hasTimezone = meta.containsOption(StorageConstants.TIMEZONE);
- timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE));
- }
-
- private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
- return !val.isReadable() || nullBytes.equals(val);
- }
-
- private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
- return val.readableBytes() > 0 && nullBytes.equals(val);
- }
-
- @Override
- public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars)
- throws IOException {
- byte[] bytes;
- int length = 0;
- TajoDataTypes.DataType dataType = col.getDataType();
-
- if (datum == null || datum instanceof NullDatum) {
- switch (dataType.getType()) {
- case CHAR:
- case TEXT:
- length = nullChars.length;
- out.write(nullChars);
- break;
- default:
- break;
- }
- return length;
- }
-
- switch (dataType.getType()) {
- case BOOLEAN:
- out.write(datum.asBool() ? trueBytes : falseBytes);
- length = trueBytes.length;
- break;
- case CHAR:
- byte[] pad = new byte[dataType.getLength() - datum.size()];
- bytes = datum.asTextBytes();
- out.write(bytes);
- out.write(pad);
- length = bytes.length + pad.length;
- break;
- case TEXT:
- case BIT:
- case INT2:
- case INT4:
- case INT8:
- case FLOAT4:
- case FLOAT8:
- case INET4:
- case DATE:
- case INTERVAL:
- bytes = datum.asTextBytes();
- length = bytes.length;
- out.write(bytes);
- break;
- case TIME:
- if (hasTimezone) {
- bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes();
- } else {
- bytes = datum.asTextBytes();
- }
- length = bytes.length;
- out.write(bytes);
- break;
- case TIMESTAMP:
- if (hasTimezone) {
- bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes();
- } else {
- bytes = datum.asTextBytes();
- }
- length = bytes.length;
- out.write(bytes);
- break;
- case INET6:
- case BLOB:
- bytes = Base64.encodeBase64(datum.asByteArray(), false);
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobuf = (ProtobufDatum) datum;
- byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
- length = protoBytes.length;
- out.write(protoBytes, 0, protoBytes.length);
- break;
- case NULL_TYPE:
- default:
- break;
- }
- return length;
- }
-
- @Override
- public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
- Datum datum;
- TajoDataTypes.Type type = col.getDataType().getType();
- boolean nullField;
- if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
- nullField = isNullText(buf, nullChars);
- } else {
- nullField = isNull(buf, nullChars);
- }
-
- if (nullField) {
- datum = NullDatum.get();
- } else {
- switch (type) {
- case BOOLEAN:
- byte bool = buf.readByte();
- datum = DatumFactory.createBool(bool == 't' || bool == 'T');
- break;
- case BIT:
- datum = DatumFactory.createBit(Byte.parseByte(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
- break;
- case CHAR:
- datum = DatumFactory.createChar(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
- break;
- case INT1:
- case INT2:
- datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
- break;
- case INT4:
- datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
- break;
- case INT8:
- datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
- break;
- case FLOAT4:
- datum = DatumFactory.createFloat4(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- break;
- case FLOAT8:
- datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
- break;
- case TEXT: {
- byte[] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);
- datum = DatumFactory.createText(bytes);
- break;
- }
- case DATE:
- datum = DatumFactory.createDate(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- break;
- case TIME:
- if (hasTimezone) {
- datum = DatumFactory.createTime(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
- } else {
- datum = DatumFactory.createTime(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- }
- break;
- case TIMESTAMP:
- if (hasTimezone) {
- datum = DatumFactory.createTimestamp(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone);
- } else {
- datum = DatumFactory.createTimestamp(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- }
- break;
- case INTERVAL:
- datum = DatumFactory.createInterval(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- break;
- case PROTOBUF: {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
- Message.Builder builder = factory.newBuilder();
- try {
- byte[] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);
- protobufJsonFormat.merge(bytes, builder);
- datum = factory.createDatum(builder.build());
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- break;
- }
- case INET4:
- datum = DatumFactory.createInet4(
- decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
- break;
- case BLOB: {
- byte[] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);
- datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
- break;
- }
- default:
- datum = NullDatum.get();
- break;
- }
- }
- return datum;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
deleted file mode 100644
index 7ebfa79..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * Reads a text line and fills a Tuple with values
- */
-public abstract class TextLineDeserializer {
- protected Schema schema;
- protected TableMeta meta;
- protected int [] targetColumnIndexes;
-
- public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
- this.schema = schema;
- this.meta = meta;
- this.targetColumnIndexes = targetColumnIndexes;
- }
-
- /**
- * Initialize SerDe
- */
- public abstract void init();
-
- /**
- * It fills a tuple with a read fields in a given line.
- *
- * @param buf Read line
- * @param output Tuple to be filled with read fields
- * @throws java.io.IOException
- */
- public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
-
- /**
- * Release external resources
- */
- public abstract void release();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
deleted file mode 100644
index f0bae5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-public class TextLineParsingError extends Exception {
-
- public TextLineParsingError(Throwable t) {
- super(t);
- }
-
- public TextLineParsingError(String message, Throwable t) {
- super(t.getMessage() + ", Error line: " + message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
deleted file mode 100644
index e81e289..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.StorageConstants;
-
-/**
- * Pluggable Text Line SerDe class
- */
-public abstract class TextLineSerDe {
-
- public TextLineSerDe() {
- }
-
- public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
-
- public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
-
- public static ByteBuf getNullChars(TableMeta meta) {
- byte[] nullCharByteArray = getNullCharsAsBytes(meta);
-
- ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
- nullChars.writeBytes(nullCharByteArray);
-
- return nullChars;
- }
-
- public static byte [] getNullCharsAsBytes(TableMeta meta) {
- byte [] nullChars;
-
- String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
- NullDatum.DEFAULT_TEXT));
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
-
- return nullChars;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
deleted file mode 100644
index 0c2761f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.text;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Write a Tuple into single text formatted line
- */
-public abstract class TextLineSerializer {
- protected Schema schema;
- protected TableMeta meta;
-
- public TextLineSerializer(Schema schema, TableMeta meta) {
- this.schema = schema;
- this.meta = meta;
- }
-
- public abstract void init();
-
- public abstract int serialize(OutputStream out, Tuple input) throws IOException;
-
- public abstract void release();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
deleted file mode 100644
index 543336f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.BadConfigurationException;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-class CodecFactory {
-
- public class BytesDecompressor {
-
- private final CompressionCodec codec;
- private final Decompressor decompressor;
-
- public BytesDecompressor(CompressionCodec codec) {
- this.codec = codec;
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- decompressor = null;
- }
- }
-
- public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
- final BytesInput decompressed;
- if (codec != null) {
- decompressor.reset();
- InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
- } else {
- decompressed = bytes;
- }
- return decompressed;
- }
-
- private void release() {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
-
- /**
- * Encapsulates the logic around hadoop compression
- *
- * @author Julien Le Dem
- *
- */
- public static class BytesCompressor {
-
- private final CompressionCodec codec;
- private final Compressor compressor;
- private final ByteArrayOutputStream compressedOutBuffer;
- private final CompressionCodecName codecName;
-
- public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
- this.codecName = codecName;
- this.codec = codec;
- if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
- this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
- } else {
- this.compressor = null;
- this.compressedOutBuffer = null;
- }
- }
-
- public BytesInput compress(BytesInput bytes) throws IOException {
- final BytesInput compressedBytes;
- if (codec == null) {
- compressedBytes = bytes;
- } else {
- compressedOutBuffer.reset();
- if (compressor != null) {
- // null compressor for non-native gzip
- compressor.reset();
- }
- CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
- bytes.writeAllTo(cos);
- cos.finish();
- cos.close();
- compressedBytes = BytesInput.from(compressedOutBuffer);
- }
- return compressedBytes;
- }
-
- private void release() {
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- }
- }
-
- public CompressionCodecName getCodecName() {
- return codecName;
- }
-
- }
-
- private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
- private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
- private final Configuration configuration;
-
- public CodecFactory(Configuration configuration) {
- this.configuration = configuration;
- }
-
- /**
- *
- * @param codecName the requested codec
- * @return the corresponding hadoop codec. null if UNCOMPRESSED
- */
- private CompressionCodec getCodec(CompressionCodecName codecName) {
- String codecClassName = codecName.getHadoopCompressionCodecClassName();
- if (codecClassName == null) {
- return null;
- }
- CompressionCodec codec = codecByName.get(codecClassName);
- if (codec != null) {
- return codec;
- }
-
- try {
- Class<?> codecClass = Class.forName(codecClassName);
- codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- codecByName.put(codecClassName, codec);
- return codec;
- } catch (ClassNotFoundException e) {
- throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
- }
- }
-
- public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
- BytesCompressor comp = compressors.get(codecName);
- if (comp == null) {
- CompressionCodec codec = getCodec(codecName);
- comp = new BytesCompressor(codecName, codec, pageSize);
- compressors.put(codecName, comp);
- }
- return comp;
- }
-
- public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
- BytesDecompressor decomp = decompressors.get(codecName);
- if (decomp == null) {
- CompressionCodec codec = getCodec(codecName);
- decomp = new BytesDecompressor(codec);
- decompressors.put(codecName, decomp);
- }
- return decomp;
- }
-
- public void release() {
- for (BytesCompressor compressor : compressors.values()) {
- compressor.release();
- }
- compressors.clear();
- for (BytesDecompressor decompressor : decompressors.values()) {
- decompressor.release();
- }
- decompressors.clear();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
deleted file mode 100644
index 5f89ead..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.INFO;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
- private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
- private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
- private static final class ColumnChunkPageWriter implements PageWriter {
-
- private final ColumnDescriptor path;
- private final BytesCompressor compressor;
-
- private final CapacityByteArrayOutputStream buf;
- private DictionaryPage dictionaryPage;
-
- private long uncompressedLength;
- private long compressedLength;
- private long totalValueCount;
- private int pageCount;
-
- private Set<Encoding> encodings = new HashSet<Encoding>();
-
- private Statistics totalStatistics;
-
- private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
- this.path = path;
- this.compressor = compressor;
- this.buf = new CapacityByteArrayOutputStream(initialSize);
- this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- @Deprecated
- @Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- buf);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- compressedBytes.writeAllTo(buf);
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
- }
-
- @Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Statistics statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- buf);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- this.totalStatistics.mergeStatistics(statistics);
- compressedBytes.writeAllTo(buf);
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
- }
-
- @Override
- public long getMemSize() {
- return buf.size();
- }
-
- public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- writer.startColumn(path, totalValueCount, compressor.getCodecName());
- if (dictionaryPage != null) {
- writer.writeDictionaryPage(dictionaryPage);
- encodings.add(dictionaryPage.getEncoding());
- }
- writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
- writer.endColumn();
- if (INFO) {
- LOG.info(
- String.format(
- "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
- buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
- + (dictionaryPage != null ? String.format(
- ", dic { %,d entries, %,dB raw, %,dB comp}",
- dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
- : ""));
- }
- encodings.clear();
- pageCount = 0;
- }
-
- @Override
- public long allocatedSize() {
- return buf.getCapacity();
- }
-
- @Override
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- if (this.dictionaryPage != null) {
- throw new ParquetEncodingException("Only one dictionary page is allowed");
- }
- BytesInput dictionaryBytes = dictionaryPage.getBytes();
- int uncompressedSize = (int)dictionaryBytes.size();
- BytesInput compressedBytes = compressor.compress(dictionaryBytes);
- this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
- }
-
- @Override
- public String memUsageString(String prefix) {
- return buf.memUsageString(prefix + " ColumnChunkPageWriter");
- }
- }
-
- private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
- private final MessageType schema;
- private final BytesCompressor compressor;
- private final int initialSize;
-
- public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
- this.compressor = compressor;
- this.schema = schema;
- this.initialSize = initialSize;
- }
-
- @Override
- public PageWriter getPageWriter(ColumnDescriptor path) {
- if (!writers.containsKey(path)) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
- }
- return writers.get(path);
- }
-
- public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- List<ColumnDescriptor> columns = schema.getColumns();
- for (ColumnDescriptor columnDescriptor : columns) {
- ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
- pageWriter.writeToFileWriter(writer);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
deleted file mode 100644
index 61567e5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-
-class InternalParquetRecordReader<T> {
- private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
-
- private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
-
- private MessageType requestedSchema;
- private MessageType fileSchema;
- private int columnCount;
- private final ReadSupport<T> readSupport;
-
- private RecordMaterializer<T> recordConverter;
-
- private T currentValue;
- private long total;
- private int current = 0;
- private int currentBlock = -1;
- private ParquetFileReader reader;
- private parquet.io.RecordReader<T> recordReader;
- private UnboundRecordFilter recordFilter;
-
- private long totalTimeSpentReadingBytes;
- private long totalTimeSpentProcessingRecords;
- private long startedAssemblingCurrentBlockAt;
-
- private long totalCountLoadedSoFar = 0;
-
- private Path file;
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- */
- public InternalParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, null);
- }
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter Optional filter for only returning matching records.
- */
- public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
- filter) {
- this.readSupport = readSupport;
- this.recordFilter = filter;
- }
-
- private void checkRead() throws IOException {
- if (current == totalCountLoadedSoFar) {
- if (current != 0) {
- long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
- totalTimeSpentProcessingRecords += timeAssembling;
- LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
- long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
- long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
- long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
- LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
- }
-
- LOG.info("at row " + current + ". reading next block");
- long t0 = System.currentTimeMillis();
- PageReadStore pages = reader.readNextRowGroup();
- if (pages == null) {
- throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
- }
- long timeSpentReading = System.currentTimeMillis() - t0;
- totalTimeSpentReadingBytes += timeSpentReading;
- BenchmarkCounter.incrementTime(timeSpentReading);
- LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
- if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
- MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
- recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
- startedAssemblingCurrentBlockAt = System.currentTimeMillis();
- totalCountLoadedSoFar += pages.getRowCount();
- ++ currentBlock;
- }
- }
-
- public void close() throws IOException {
- reader.close();
- }
-
- public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- public T getCurrentValue() throws IOException,
- InterruptedException {
- return currentValue;
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return (float) current / total;
- }
-
- public void initialize(MessageType requestedSchema, MessageType fileSchema,
- Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
- Path file, List<BlockMetaData> blocks, Configuration configuration)
- throws IOException {
- this.requestedSchema = requestedSchema;
- this.fileSchema = fileSchema;
- this.file = file;
- this.columnCount = this.requestedSchema.getPaths().size();
- this.recordConverter = readSupport.prepareForRead(
- configuration, extraMetadata, fileSchema,
- new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
-
- List<ColumnDescriptor> columns = requestedSchema.getColumns();
- reader = new ParquetFileReader(configuration, file, blocks, columns);
- for (BlockMetaData block : blocks) {
- total += block.getRowCount();
- }
- LOG.info("RecordReader initialized will read a total of " + total + " records.");
- }
-
- private boolean contains(GroupType group, String[] path, int index) {
- if (index == path.length) {
- return false;
- }
- if (group.containsField(path[index])) {
- Type type = group.getType(path[index]);
- if (type.isPrimitive()) {
- return index + 1 == path.length;
- } else {
- return contains(type.asGroupType(), path, index + 1);
- }
- }
- return false;
- }
-
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (current < total) {
- try {
- checkRead();
- currentValue = recordReader.read();
- if (DEBUG) LOG.debug("read value: " + currentValue);
- current ++;
- } catch (RuntimeException e) {
- throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
- }
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
deleted file mode 100644
index 7410d2b..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.util.Map;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-class InternalParquetRecordWriter<T> {
- private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
- private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
- private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
- private final ParquetFileWriter w;
- private final WriteSupport<T> writeSupport;
- private final MessageType schema;
- private final Map<String, String> extraMetaData;
- private final int blockSize;
- private final int pageSize;
- private final BytesCompressor compressor;
- private final int dictionaryPageSize;
- private final boolean enableDictionary;
- private final boolean validating;
- private final WriterVersion writerVersion;
-
- private long recordCount = 0;
- private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
- private ColumnWriteStoreImpl store;
- private ColumnChunkPageWriteStore pageStore;
-
- /**
- * @param w the file to write to
- * @param writeSupport the class to convert incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to write in the footer of the file
- * @param blockSize the size of a block in the file (this will be approximate)
- * @param codec the codec used to compress
- */
- public InternalParquetRecordWriter(
- ParquetFileWriter w,
- WriteSupport<T> writeSupport,
- MessageType schema,
- Map<String, String> extraMetaData,
- int blockSize,
- int pageSize,
- BytesCompressor compressor,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- WriterVersion writerVersion) {
- this.w = w;
- this.writeSupport = checkNotNull(writeSupport, "writeSupport");
- this.schema = schema;
- this.extraMetaData = extraMetaData;
- this.blockSize = blockSize;
- this.pageSize = pageSize;
- this.compressor = compressor;
- this.dictionaryPageSize = dictionaryPageSize;
- this.enableDictionary = enableDictionary;
- this.validating = validating;
- this.writerVersion = writerVersion;
- initStore();
- }
-
- private void initStore() {
- // we don't want this number to be too small
- // ideally we divide the block equally across the columns
- // it is unlikely all columns are going to be the same size.
- int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
- // we don't want this number to be too small either
- // ideally, slightly bigger than the page size, but not bigger than the block buffer
- int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
- store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
- MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
- writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
- }
-
- public void close() throws IOException, InterruptedException {
- flushStore();
- w.end(extraMetaData);
- }
-
- public void write(T value) throws IOException, InterruptedException {
- writeSupport.write(value);
- ++ recordCount;
- checkBlockSizeReached();
- }
-
- private void checkBlockSizeReached() throws IOException {
- if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
- long memSize = store.memSize();
- if (memSize > blockSize) {
- LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
- flushStore();
- initStore();
- recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
- } else {
- float recordSize = (float) memSize / recordCount;
- recordCountForNextMemCheck = min(
- max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
- recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
- );
- if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
- }
- }
- }
-
- public long getEstimatedWrittenSize() throws IOException {
- return w.getPos() + store.memSize();
- }
-
- private void flushStore()
- throws IOException {
- LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
- if (store.allocatedSize() > 3 * blockSize) {
- LOG.warn("Too much memory used: " + store.memUsageString());
- }
- w.startBlock(recordCount);
- store.flush();
- pageStore.flushToFileWriter(w);
- recordCount = 0;
- w.endBlock();
- store = null;
- pageStore = null;
- }
-}
\ No newline at end of file