You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:19 UTC
[04/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
new file mode 100644
index 0000000..1448885
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.ByteBufInputChannel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ByteBufLineReader implements Closeable {
+ private static int DEFAULT_BUFFER = 64 * 1024;
+
+ private int bufferSize;
+ private long readBytes;
+ private ByteBuf buffer;
+ private final ByteBufInputChannel channel;
+ private final AtomicInteger tempReadBytes = new AtomicInteger();
+ private final LineSplitProcessor processor = new LineSplitProcessor();
+
+ public ByteBufLineReader(ByteBufInputChannel channel) {
+ this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
+ }
+
+ public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
+ this.readBytes = 0;
+ this.channel = channel;
+ this.buffer = buf;
+ this.bufferSize = buf.capacity();
+ }
+
+ public long readBytes() {
+ return readBytes - buffer.readableBytes();
+ }
+
+ public long available() throws IOException {
+ return channel.available() + buffer.readableBytes();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.buffer.refCnt() > 0) {
+ this.buffer.release();
+ }
+ this.channel.close();
+ }
+
+ public String readLine() throws IOException {
+ ByteBuf buf = readLineBuf(tempReadBytes);
+ if (buf != null) {
+ return buf.toString(CharsetUtil.UTF_8);
+ }
+ return null;
+ }
+
+ private void fillBuffer() throws IOException {
+
+ int tailBytes = 0;
+ if (this.readBytes > 0) {
+ this.buffer.markReaderIndex();
+ this.buffer.discardSomeReadBytes(); // compact the buffer
+ tailBytes = this.buffer.writerIndex();
+ if (!this.buffer.isWritable()) {
+ // a line bytes is large than the buffer
+ BufferPool.ensureWritable(buffer, bufferSize);
+ this.bufferSize = buffer.capacity();
+ }
+ }
+
+ boolean release = true;
+ try {
+ int readBytes = tailBytes;
+ for (; ; ) {
+ int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes);
+ if (localReadBytes < 0) {
+ break;
+ }
+ readBytes += localReadBytes;
+ if (readBytes == bufferSize) {
+ break;
+ }
+ }
+ this.readBytes += (readBytes - tailBytes);
+ release = false;
+ this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+ } finally {
+ if (release) {
+ buffer.release();
+ }
+ }
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
+ int startIndex = buffer.readerIndex();
+ int readBytes;
+ int readable;
+ int newlineLength; //length of terminating newline
+
+ loop:
+ while (true) {
+ readable = buffer.readableBytes();
+ if (readable <= 0) {
+ buffer.readerIndex(startIndex);
+ fillBuffer(); //compact and fill buffer
+ if (!buffer.isReadable()) {
+ return null;
+ } else {
+ startIndex = 0; // reset the line start position
+ }
+ readable = buffer.readableBytes();
+ }
+
+ int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
+ if (endIndex < 0) {
+ buffer.readerIndex(buffer.writerIndex());
+ } else {
+ buffer.readerIndex(endIndex + 1);
+ readBytes = buffer.readerIndex() - startIndex;
+ if (processor.isPrevCharCR() && buffer.isReadable()
+ && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+ buffer.skipBytes(1);
+ newlineLength = 2;
+ } else {
+ newlineLength = 1;
+ }
+ break loop;
+ }
+ }
+ reads.set(readBytes);
+ return buffer.slice(startIndex, readBytes - newlineLength);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
new file mode 100644
index 0000000..10d86bd
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.ByteBufInputChannel;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DelimitedLineReader implements Closeable {
+ private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
+ private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
+
+ private FileSystem fs;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+
+ private long startOffset, end, pos;
+ private boolean eof = true;
+ private ByteBufLineReader lineReader;
+ private AtomicInteger tempReadBytes = new AtomicInteger();
+ private FileFragment fragment;
+ private Configuration conf;
+
+ public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException {
+ this.fragment = fragment;
+ this.conf = conf;
+ this.factory = new CompressionCodecFactory(conf);
+ this.codec = factory.getCodec(fragment.getPath());
+ if (this.codec instanceof SplittableCompressionCodec) {
+ throw new NotImplementedException(); // bzip2 does not support multi-thread model
+ }
+ }
+
+ public void init() throws IOException {
+ if (fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
+ }
+ if (fis == null) fis = fs.open(fragment.getPath());
+ pos = startOffset = fragment.getStartKey();
+ end = startOffset + fragment.getLength();
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ ByteBufInputChannel channel = new ByteBufInputChannel(is);
+ lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+ } else {
+ fis.seek(startOffset);
+ is = fis;
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(is);
+ lineReader = new ByteBufLineReader(channel,
+ BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+ }
+ eof = false;
+ }
+
+ public long getCompressedPosition() throws IOException {
+ long retVal;
+ if (isCompressed()) {
+ retVal = fis.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+ public long getUnCompressedPosition() throws IOException {
+ return pos;
+ }
+
+ public long getReadBytes() {
+ return pos - startOffset;
+ }
+
+ public boolean isReadable() {
+ return !eof;
+ }
+
+ public ByteBuf readLine() throws IOException {
+ if (eof) {
+ return null;
+ }
+
+ ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+ if (buf == null) {
+ eof = true;
+ } else {
+ pos += tempReadBytes.get();
+ }
+
+ if (!isCompressed() && getCompressedPosition() > end) {
+ eof = true;
+ }
+ return buf;
+ }
+
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ IOUtils.cleanup(LOG, lineReader, is, fis);
+ fs = null;
+ is = null;
+ fis = null;
+ lineReader = null;
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
new file mode 100644
index 0000000..a337509
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class DelimitedTextFile {
+
+ public static final byte LF = '\n';
+ public static int EOF = -1;
+
+ private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+
+ public static class DelimitedTextFileAppender extends FileAppender {
+ private final TableMeta meta;
+ private final Schema schema;
+ private final int columnNum;
+ private final FileSystem fs;
+ private FSDataOutputStream fos;
+ private DataOutputStream outputStream;
+ private CompressionOutputStream deflateFilter;
+ private char delimiter;
+ private TableStatistics stats = null;
+ private Compressor compressor;
+ private CompressionCodecFactory codecFactory;
+ private CompressionCodec codec;
+ private Path compressedPath;
+ private byte[] nullChars;
+ private int BUFFER_SIZE = 128 * 1024;
+ private int bufferedBytes = 0;
+ private long pos = 0;
+
+ private NonSyncByteArrayOutputStream os;
+ private FieldSerializerDeserializer serde;
+
+ public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ final Schema schema, final TableMeta meta, final Path path)
+ throws IOException {
+ super(conf, taskAttemptId, schema, meta, path);
+ this.fs = path.getFileSystem(conf);
+ this.meta = meta;
+ this.schema = schema;
+ this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+ this.columnNum = schema.size();
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+ String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+ codecFactory = new CompressionCodecFactory(conf);
+ codec = codecFactory.getCodecByClassName(codecName);
+ compressor = CodecPool.getCompressor(codec);
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ String extension = codec.getDefaultExtension();
+ compressedPath = path.suffix(extension);
+
+ if (fs.exists(compressedPath)) {
+ throw new AlreadyExistsStorageException(compressedPath);
+ }
+
+ fos = fs.create(compressedPath);
+ deflateFilter = codec.createOutputStream(fos, compressor);
+ outputStream = new DataOutputStream(deflateFilter);
+
+ } else {
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+ fos = fs.create(path);
+ outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ serde = new TextFieldSerializerDeserializer();
+
+ if (os == null) {
+ os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ }
+
+ os.reset();
+ pos = fos.getPos();
+ bufferedBytes = 0;
+ super.init();
+ }
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+ int rowBytes = 0;
+
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars);
+
+ if (columnNum - 1 > i) {
+ os.write((byte) delimiter);
+ rowBytes += 1;
+ }
+ }
+ os.write(LF);
+ rowBytes += 1;
+
+ pos += rowBytes;
+ bufferedBytes += rowBytes;
+ if (bufferedBytes > BUFFER_SIZE) {
+ flushBuffer();
+ }
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ private void flushBuffer() throws IOException {
+ if (os.getLength() > 0) {
+ os.writeTo(outputStream);
+ os.reset();
+ bufferedBytes = 0;
+ }
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ if(outputStream != null){
+ flush();
+ }
+
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
+ if (deflateFilter != null) {
+ deflateFilter.finish();
+ deflateFilter.resetState();
+ deflateFilter = null;
+ }
+
+ os.close();
+ } finally {
+ IOUtils.cleanup(LOG, fos);
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isCompress() {
+ return compressor != null;
+ }
+
+ public String getExtension() {
+ return codec != null ? codec.getDefaultExtension() : "";
+ }
+ }
+
+ public static class DelimitedTextFileScanner extends FileScanner {
+
+ private boolean splittable = false;
+ private final long startOffset;
+ private final long endOffset;
+
+ private int recordCount = 0;
+ private int[] targetColumnIndexes;
+
+ private ByteBuf nullChars;
+ private FieldSerializerDeserializer serde;
+ private DelimitedLineReader reader;
+ private FieldSplitProcessor processor;
+
+ public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
+ final Fragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
+ reader = new DelimitedLineReader(conf, this.fragment);
+ if (!reader.isCompressed()) {
+ splittable = true;
+ }
+
+ startOffset = this.fragment.getStartKey();
+ endOffset = startOffset + fragment.getLength();
+
+ //Delimiter
+ String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (nullChars != null) {
+ nullChars.release();
+ }
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+ NullDatum.DEFAULT_TEXT));
+ byte[] bytes;
+ if (StringUtils.isEmpty(nullCharacters)) {
+ bytes = NullDatum.get().asTextBytes();
+ } else {
+ bytes = nullCharacters.getBytes();
+ }
+
+ nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
+ nullChars.writeBytes(bytes);
+
+ if (reader != null) {
+ reader.close();
+ }
+ reader = new DelimitedLineReader(conf, fragment);
+ reader.init();
+ recordCount = 0;
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
+ }
+
+ serde = new TextFieldSerializerDeserializer();
+
+ super.init();
+ Arrays.sort(targetColumnIndexes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
+ }
+
+ if (startOffset > 0) {
+ reader.readLine(); // skip first line;
+ }
+ }
+
+ public ByteBuf readLine() throws IOException {
+ ByteBuf buf = reader.readLine();
+ if (buf == null) {
+ return null;
+ } else {
+ recordCount++;
+ }
+
+ return buf;
+ }
+
+ @Override
+ public float getProgress() {
+ try {
+ if (!reader.isReadable()) {
+ return 1.0f;
+ }
+ long filePos = reader.getCompressedPosition();
+ if (startOffset == filePos) {
+ return 0.0f;
+ } else {
+ long readBytes = filePos - startOffset;
+ long remainingBytes = Math.max(endOffset - filePos, 0);
+ return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ try {
+ if (!reader.isReadable()) return null;
+
+ ByteBuf buf = readLine();
+ if (buf == null) return null;
+
+ if (targets.length == 0) {
+ return EmptyTuple.get();
+ }
+
+ VTuple tuple = new VTuple(schema.size());
+ fillTuple(schema, tuple, buf, targetColumnIndexes);
+ return tuple;
+ } catch (Throwable t) {
+ LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t);
+ throw new IOException(t);
+ }
+ }
+
+ private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException {
+ int[] projection = target;
+ if (lineBuf == null || target == null || target.length == 0) {
+ return;
+ }
+
+ final int rowLength = lineBuf.readableBytes();
+ int start = 0, fieldLength = 0, end = 0;
+
+ //Projection
+ int currentTarget = 0;
+ int currentIndex = 0;
+
+ while (end != -1) {
+ end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+ if (end < 0) {
+ fieldLength = rowLength - start;
+ } else {
+ fieldLength = end - start;
+ }
+
+ if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
+ lineBuf.setIndex(start, start + fieldLength);
+ Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+ dst.put(currentIndex, datum);
+ currentTarget++;
+ }
+
+ if (projection.length == currentTarget) {
+ break;
+ }
+
+ start = end + 1;
+ currentIndex++;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (nullChars != null) {
+ nullChars.release();
+ nullChars = null;
+ }
+
+ if (tableStats != null && reader != null) {
+ tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
+ tableStats.setNumRows(recordCount);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, reader);
+ reader = null;
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return splittable;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (tableStats != null && reader != null) {
+ tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead)
+ tableStats.setNumRows(recordCount);
+ tableStats.setNumBytes(fragment.getLength());
+ }
+ return tableStats;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
new file mode 100644
index 0000000..a5ac142
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class FieldSplitProcessor implements ByteBufProcessor {
+ private char delimiter; //the ascii separate character
+
+ public FieldSplitProcessor(char recordDelimiterByte) {
+ this.delimiter = recordDelimiterByte;
+ }
+
+ @Override
+ public boolean process(byte value) throws Exception {
+ return delimiter != value;
+ }
+
+ public char getDelimiter() {
+ return delimiter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
new file mode 100644
index 0000000..a130527
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class LineSplitProcessor implements ByteBufProcessor {
+ public static final byte CR = '\r';
+ public static final byte LF = '\n';
+ private boolean prevCharCR = false; //true of prev char was CR
+
+ @Override
+ public boolean process(byte value) throws Exception {
+ switch (value) {
+ case LF:
+ return false;
+ case CR:
+ prevCharCR = true;
+ return false;
+ default:
+ prevCharCR = false;
+ return true;
+ }
+ }
+
+ public boolean isPrevCharCR() {
+ return prevCharCR;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
new file mode 100644
index 0000000..9722959
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.CharsetDecoder;
+
+//Compatibility with Apache Hive
+public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
+ public static final byte[] trueBytes = "true".getBytes();
+ public static final byte[] falseBytes = "false".getBytes();
+ private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
+
+ private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
+ return !val.isReadable() || nullBytes.equals(val);
+ }
+
+ private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
+ return val.readableBytes() > 0 && nullBytes.equals(val);
+ }
+
+ @Override
+ public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException {
+ byte[] bytes;
+ int length = 0;
+ TajoDataTypes.DataType dataType = col.getDataType();
+
+ if (datum == null || datum instanceof NullDatum) {
+ switch (dataType.getType()) {
+ case CHAR:
+ case TEXT:
+ length = nullChars.length;
+ out.write(nullChars);
+ break;
+ default:
+ break;
+ }
+ return length;
+ }
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ out.write(datum.asBool() ? trueBytes : falseBytes);
+ length = trueBytes.length;
+ break;
+ case CHAR:
+ byte[] pad = new byte[dataType.getLength() - datum.size()];
+ bytes = datum.asTextBytes();
+ out.write(bytes);
+ out.write(pad);
+ length = bytes.length + pad.length;
+ break;
+ case TEXT:
+ case BIT:
+ case INT2:
+ case INT4:
+ case INT8:
+ case FLOAT4:
+ case FLOAT8:
+ case INET4:
+ case DATE:
+ case INTERVAL:
+ bytes = datum.asTextBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case TIME:
+ bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case TIMESTAMP:
+ bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case INET6:
+ case BLOB:
+ bytes = Base64.encodeBase64(datum.asByteArray(), false);
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobuf = (ProtobufDatum) datum;
+ byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+ length = protoBytes.length;
+ out.write(protoBytes, 0, protoBytes.length);
+ break;
+ case NULL_TYPE:
+ default:
+ break;
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
+ Datum datum;
+ TajoDataTypes.Type type = col.getDataType().getType();
+ boolean nullField;
+ if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+ nullField = isNullText(buf, nullChars);
+ } else {
+ nullField = isNull(buf, nullChars);
+ }
+
+ if (nullField) {
+ datum = NullDatum.get();
+ } else {
+ switch (type) {
+ case BOOLEAN:
+ byte bool = buf.readByte();
+ datum = DatumFactory.createBool(bool == 't' || bool == 'T');
+ break;
+ case BIT:
+ datum = DatumFactory.createBit(Byte.parseByte(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
+ break;
+ case CHAR:
+ datum = DatumFactory.createChar(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
+ break;
+ case INT1:
+ case INT2:
+ datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
+ break;
+ case INT4:
+ datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
+ break;
+ case INT8:
+ datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
+ break;
+ case FLOAT4:
+ datum = DatumFactory.createFloat4(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case FLOAT8:
+ datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
+ break;
+ case TEXT: {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ datum = DatumFactory.createText(bytes);
+ break;
+ }
+ case DATE:
+ datum = DatumFactory.createDate(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case TIME:
+ datum = DatumFactory.createTime(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case TIMESTAMP:
+ datum = DatumFactory.createTimestamp(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case INTERVAL:
+ datum = DatumFactory.createInterval(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case PROTOBUF: {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ protobufJsonFormat.merge(bytes, builder);
+ datum = factory.createDatum(builder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ case INET4:
+ datum = DatumFactory.createInet4(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case BLOB: {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
+ break;
+ }
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ }
+ return datum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
new file mode 100644
index 0000000..f76593e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.BadConfigurationException;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+class CodecFactory {
+
+ public class BytesDecompressor {
+
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+
+ public BytesDecompressor(CompressionCodec codec) {
+ this.codec = codec;
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ decompressor = null;
+ }
+ }
+
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ final BytesInput decompressed;
+ if (codec != null) {
+ decompressor.reset();
+ InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+ decompressed = BytesInput.from(is, uncompressedSize);
+ } else {
+ decompressed = bytes;
+ }
+ return decompressed;
+ }
+
+ private void release() {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+ }
+
+ /**
+ * Encapsulates the logic around hadoop compression
+ *
+ * @author Julien Le Dem
+ *
+ */
+ public static class BytesCompressor {
+
+ private final CompressionCodec codec;
+ private final Compressor compressor;
+ private final ByteArrayOutputStream compressedOutBuffer;
+ private final CompressionCodecName codecName;
+
+ public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
+ this.codecName = codecName;
+ this.codec = codec;
+ if (codec != null) {
+ this.compressor = CodecPool.getCompressor(codec);
+ this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
+ } else {
+ this.compressor = null;
+ this.compressedOutBuffer = null;
+ }
+ }
+
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ final BytesInput compressedBytes;
+ if (codec == null) {
+ compressedBytes = bytes;
+ } else {
+ compressedOutBuffer.reset();
+ if (compressor != null) {
+ // null compressor for non-native gzip
+ compressor.reset();
+ }
+ CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
+ bytes.writeAllTo(cos);
+ cos.finish();
+ cos.close();
+ compressedBytes = BytesInput.from(compressedOutBuffer);
+ }
+ return compressedBytes;
+ }
+
+ private void release() {
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
+ }
+
+ public CompressionCodecName getCodecName() {
+ return codecName;
+ }
+
+ }
+
+ private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
+ private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+ private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
+ private final Configuration configuration;
+
+ public CodecFactory(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ *
+ * @param codecName the requested codec
+ * @return the corresponding hadoop codec. null if UNCOMPRESSED
+ */
+ private CompressionCodec getCodec(CompressionCodecName codecName) {
+ String codecClassName = codecName.getHadoopCompressionCodecClassName();
+ if (codecClassName == null) {
+ return null;
+ }
+ CompressionCodec codec = codecByName.get(codecClassName);
+ if (codec != null) {
+ return codec;
+ }
+
+ try {
+ Class<?> codecClass = Class.forName(codecClassName);
+ codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
+ codecByName.put(codecClassName, codec);
+ return codec;
+ } catch (ClassNotFoundException e) {
+ throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
+ }
+ }
+
+ public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
+ BytesCompressor comp = compressors.get(codecName);
+ if (comp == null) {
+ CompressionCodec codec = getCodec(codecName);
+ comp = new BytesCompressor(codecName, codec, pageSize);
+ compressors.put(codecName, comp);
+ }
+ return comp;
+ }
+
+ public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+ BytesDecompressor decomp = decompressors.get(codecName);
+ if (decomp == null) {
+ CompressionCodec codec = getCodec(codecName);
+ decomp = new BytesDecompressor(codec);
+ decompressors.put(codecName, decomp);
+ }
+ return decomp;
+ }
+
+ public void release() {
+ for (BytesCompressor compressor : compressors.values()) {
+ compressor.release();
+ }
+ compressors.clear();
+ for (BytesDecompressor decompressor : decompressors.values()) {
+ decompressor.release();
+ }
+ decompressors.clear();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..0dedd9b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.bytes.CapacityByteArrayOutputStream;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.BooleanStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.INFO;
+
+class ColumnChunkPageWriteStore implements PageWriteStore {
+ private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
+
+ private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+ private static final class ColumnChunkPageWriter implements PageWriter {
+
+ private final ColumnDescriptor path;
+ private final BytesCompressor compressor;
+
+ private final CapacityByteArrayOutputStream buf;
+ private DictionaryPage dictionaryPage;
+
+ private long uncompressedLength;
+ private long compressedLength;
+ private long totalValueCount;
+ private int pageCount;
+
+ private Set<Encoding> encodings = new HashSet<Encoding>();
+
+ private Statistics totalStatistics;
+
+ private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
+ this.path = path;
+ this.compressor = compressor;
+ this.buf = new CapacityByteArrayOutputStream(initialSize);
+ this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
+ }
+
+ @Deprecated
+ @Override
+ public void writePage(BytesInput bytes,
+ int valueCount,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ long uncompressedSize = bytes.size();
+ BytesInput compressedBytes = compressor.compress(bytes);
+ long compressedSize = compressedBytes.size();
+ BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
+ parquetMetadataConverter.writeDataPageHeader(
+ (int)uncompressedSize,
+ (int)compressedSize,
+ valueCount,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ buf);
+ this.uncompressedLength += uncompressedSize;
+ this.compressedLength += compressedSize;
+ this.totalValueCount += valueCount;
+ this.pageCount += 1;
+ compressedBytes.writeAllTo(buf);
+ encodings.add(rlEncoding);
+ encodings.add(dlEncoding);
+ encodings.add(valuesEncoding);
+ }
+
+ @Override
+ public void writePage(BytesInput bytes,
+ int valueCount,
+ Statistics statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ long uncompressedSize = bytes.size();
+ BytesInput compressedBytes = compressor.compress(bytes);
+ long compressedSize = compressedBytes.size();
+ parquetMetadataConverter.writeDataPageHeader(
+ (int)uncompressedSize,
+ (int)compressedSize,
+ valueCount,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ buf);
+ this.uncompressedLength += uncompressedSize;
+ this.compressedLength += compressedSize;
+ this.totalValueCount += valueCount;
+ this.pageCount += 1;
+ this.totalStatistics.mergeStatistics(statistics);
+ compressedBytes.writeAllTo(buf);
+ encodings.add(rlEncoding);
+ encodings.add(dlEncoding);
+ encodings.add(valuesEncoding);
+ }
+
+ @Override
+ public long getMemSize() {
+ return buf.size();
+ }
+
+ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
+ writer.startColumn(path, totalValueCount, compressor.getCodecName());
+ if (dictionaryPage != null) {
+ writer.writeDictionaryPage(dictionaryPage);
+ encodings.add(dictionaryPage.getEncoding());
+ }
+ writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
+ writer.endColumn();
+ if (INFO) {
+ LOG.info(
+ String.format(
+ "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+ buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
+ + (dictionaryPage != null ? String.format(
+ ", dic { %,d entries, %,dB raw, %,dB comp}",
+ dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+ : ""));
+ }
+ encodings.clear();
+ pageCount = 0;
+ }
+
+ @Override
+ public long allocatedSize() {
+ return buf.getCapacity();
+ }
+
+ @Override
+ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+ if (this.dictionaryPage != null) {
+ throw new ParquetEncodingException("Only one dictionary page is allowed");
+ }
+ BytesInput dictionaryBytes = dictionaryPage.getBytes();
+ int uncompressedSize = (int)dictionaryBytes.size();
+ BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+ this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+ }
+ }
+
+ private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+ private final MessageType schema;
+ private final BytesCompressor compressor;
+ private final int initialSize;
+
+ public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
+ this.compressor = compressor;
+ this.schema = schema;
+ this.initialSize = initialSize;
+ }
+
+ @Override
+ public PageWriter getPageWriter(ColumnDescriptor path) {
+ if (!writers.containsKey(path)) {
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
+ }
+ return writers.get(path);
+ }
+
+ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+ List<ColumnDescriptor> columns = schema.getColumns();
+ for (ColumnDescriptor columnDescriptor : columns) {
+ ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
+ pageWriter.writeToFileWriter(writer);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
new file mode 100644
index 0000000..6bbd7b5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.PageReadStore;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static parquet.Log.DEBUG;
+
+class InternalParquetRecordReader<T> {
+ private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+ private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+
+ private MessageType requestedSchema;
+ private MessageType fileSchema;
+ private int columnCount;
+ private final ReadSupport<T> readSupport;
+
+ private RecordMaterializer<T> recordConverter;
+
+ private T currentValue;
+ private long total;
+ private int current = 0;
+ private int currentBlock = -1;
+ private ParquetFileReader reader;
+ private parquet.io.RecordReader<T> recordReader;
+ private UnboundRecordFilter recordFilter;
+
+ private long totalTimeSpentReadingBytes;
+ private long totalTimeSpentProcessingRecords;
+ private long startedAssemblingCurrentBlockAt;
+
+ private long totalCountLoadedSoFar = 0;
+
+ private Path file;
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ */
+ public InternalParquetRecordReader(ReadSupport<T> readSupport) {
+ this(readSupport, null);
+ }
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ * @param filter Optional filter for only returning matching records.
+ */
+ public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
+ filter) {
+ this.readSupport = readSupport;
+ this.recordFilter = filter;
+ }
+
+ private void checkRead() throws IOException {
+ if (current == totalCountLoadedSoFar) {
+ if (current != 0) {
+ long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
+ totalTimeSpentProcessingRecords += timeAssembling;
+ LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+ long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+ long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+ long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+ LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+ }
+
+ LOG.info("at row " + current + ". reading next block");
+ long t0 = System.currentTimeMillis();
+ PageReadStore pages = reader.readNextRowGroup();
+ if (pages == null) {
+ throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
+ }
+ long timeSpentReading = System.currentTimeMillis() - t0;
+ totalTimeSpentReadingBytes += timeSpentReading;
+ BenchmarkCounter.incrementTime(timeSpentReading);
+ LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+ if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+ MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+ recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+ startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+ totalCountLoadedSoFar += pages.getRowCount();
+ ++ currentBlock;
+ }
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ public T getCurrentValue() throws IOException,
+ InterruptedException {
+ return currentValue;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return (float) current / total;
+ }
+
+ public void initialize(MessageType requestedSchema, MessageType fileSchema,
+ Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+ Path file, List<BlockMetaData> blocks, Configuration configuration)
+ throws IOException {
+ this.requestedSchema = requestedSchema;
+ this.fileSchema = fileSchema;
+ this.file = file;
+ this.columnCount = this.requestedSchema.getPaths().size();
+ this.recordConverter = readSupport.prepareForRead(
+ configuration, extraMetadata, fileSchema,
+ new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+
+ List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ reader = new ParquetFileReader(configuration, file, blocks, columns);
+ for (BlockMetaData block : blocks) {
+ total += block.getRowCount();
+ }
+ LOG.info("RecordReader initialized will read a total of " + total + " records.");
+ }
+
+ private boolean contains(GroupType group, String[] path, int index) {
+ if (index == path.length) {
+ return false;
+ }
+ if (group.containsField(path[index])) {
+ Type type = group.getType(path[index]);
+ if (type.isPrimitive()) {
+ return index + 1 == path.length;
+ } else {
+ return contains(type.asGroupType(), path, index + 1);
+ }
+ }
+ return false;
+ }
+
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (current < total) {
+ try {
+ checkRead();
+ currentValue = recordReader.read();
+ if (DEBUG) LOG.debug("read value: " + currentValue);
+ current ++;
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
+ }
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
new file mode 100644
index 0000000..532d9a2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.String.format;
+import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.DEBUG;
+import static parquet.Preconditions.checkNotNull;
+
+class InternalParquetRecordWriter<T> {
+ private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
+
+ private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+ private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+
+ private final ParquetFileWriter w;
+ private final WriteSupport<T> writeSupport;
+ private final MessageType schema;
+ private final Map<String, String> extraMetaData;
+ private final int blockSize;
+ private final int pageSize;
+ private final BytesCompressor compressor;
+ private final int dictionaryPageSize;
+ private final boolean enableDictionary;
+ private final boolean validating;
+ private final WriterVersion writerVersion;
+
+ private long recordCount = 0;
+ private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+ private ColumnWriteStoreImpl store;
+ private ColumnChunkPageWriteStore pageStore;
+
+ /**
+ * @param w the file to write to
+ * @param writeSupport the class to convert incoming records
+ * @param schema the schema of the records
+ * @param extraMetaData extra meta data to write in the footer of the file
+ * @param blockSize the size of a block in the file (this will be approximate)
+ * @param codec the codec used to compress
+ */
+ public InternalParquetRecordWriter(
+ ParquetFileWriter w,
+ WriteSupport<T> writeSupport,
+ MessageType schema,
+ Map<String, String> extraMetaData,
+ int blockSize,
+ int pageSize,
+ BytesCompressor compressor,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ WriterVersion writerVersion) {
+ this.w = w;
+ this.writeSupport = checkNotNull(writeSupport, "writeSupport");
+ this.schema = schema;
+ this.extraMetaData = extraMetaData;
+ this.blockSize = blockSize;
+ this.pageSize = pageSize;
+ this.compressor = compressor;
+ this.dictionaryPageSize = dictionaryPageSize;
+ this.enableDictionary = enableDictionary;
+ this.validating = validating;
+ this.writerVersion = writerVersion;
+ initStore();
+ }
+
+ private void initStore() {
+ // we don't want this number to be too small
+ // ideally we divide the block equally across the columns
+ // it is unlikely all columns are going to be the same size.
+ int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
+ pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
+ // we don't want this number to be too small either
+ // ideally, slightly bigger than the page size, but not bigger than the block buffer
+ int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
+ store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
+ MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
+ writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
+ }
+
+ public void close() throws IOException, InterruptedException {
+ flushStore();
+ w.end(extraMetaData);
+ }
+
+ public void write(T value) throws IOException, InterruptedException {
+ writeSupport.write(value);
+ ++ recordCount;
+ checkBlockSizeReached();
+ }
+
+ private void checkBlockSizeReached() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
+ long memSize = store.memSize();
+ if (memSize > blockSize) {
+ LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
+ flushStore();
+ initStore();
+ recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+ } else {
+ float recordSize = (float) memSize / recordCount;
+ recordCountForNextMemCheck = min(
+ max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
+ recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
+ );
+ if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
+ }
+ }
+ }
+
+ public long getEstimatedWrittenSize() throws IOException {
+ return w.getPos() + store.memSize();
+ }
+
+ private void flushStore()
+ throws IOException {
+ LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
+ if (store.allocatedSize() > 3 * blockSize) {
+ LOG.warn("Too much memory used: " + store.memUsageString());
+ }
+ w.startBlock(recordCount);
+ store.flush();
+ pageStore.flushToFileWriter(w);
+ recordCount = 0;
+ w.endBlock();
+ store = null;
+ pageStore = null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
new file mode 100644
index 0000000..f1c5368
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.Version;
+import parquet.bytes.BytesInput;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.DictionaryPage;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.metadata.*;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetFileWriter {
+ private static final Log LOG = Log.getLog(ParquetFileWriter.class);
+
+ public static final String PARQUET_METADATA_FILE = "_metadata";
+ public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+ public static final int CURRENT_VERSION = 1;
+
+ private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+
+ private final MessageType schema;
+ private final FSDataOutputStream out;
+ private BlockMetaData currentBlock;
+ private ColumnChunkMetaData currentColumn;
+ private long currentRecordCount;
+ private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ private long uncompressedLength;
+ private long compressedLength;
+ private Set<parquet.column.Encoding> currentEncodings;
+
+ private CompressionCodecName currentChunkCodec;
+ private ColumnPath currentChunkPath;
+ private PrimitiveTypeName currentChunkType;
+ private long currentChunkFirstDataPage;
+ private long currentChunkDictionaryPageOffset;
+ private long currentChunkValueCount;
+
+ private Statistics currentStatistics;
+
+ /**
+ * Captures the order in which methods should be called
+ *
+ * @author Julien Le Dem
+ *
+ */
+ private enum STATE {
+ NOT_STARTED {
+ STATE start() {
+ return STARTED;
+ }
+ },
+ STARTED {
+ STATE startBlock() {
+ return BLOCK;
+ }
+ STATE end() {
+ return ENDED;
+ }
+ },
+ BLOCK {
+ STATE startColumn() {
+ return COLUMN;
+ }
+ STATE endBlock() {
+ return STARTED;
+ }
+ },
+ COLUMN {
+ STATE endColumn() {
+ return BLOCK;
+ };
+ STATE write() {
+ return this;
+ }
+ },
+ ENDED;
+
+ STATE start() throws IOException { return error(); }
+ STATE startBlock() throws IOException { return error(); }
+ STATE startColumn() throws IOException { return error(); }
+ STATE write() throws IOException { return error(); }
+ STATE endColumn() throws IOException { return error(); }
+ STATE endBlock() throws IOException { return error(); }
+ STATE end() throws IOException { return error(); }
+
+ private final STATE error() throws IOException {
+ throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+ }
+ }
+
+ private STATE state = STATE.NOT_STARTED;
+
+ /**
+ *
+ * @param schema the schema of the data
+ * @param out the file to write to
+ * @param codec the codec to use to compress blocks
+ * @throws java.io.IOException if the file can not be created
+ */
+ public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
+ super();
+ this.schema = schema;
+ FileSystem fs = file.getFileSystem(configuration);
+ this.out = fs.create(file, false);
+ }
+
+ /**
+ * start the file
+ * @throws java.io.IOException
+ */
+ public void start() throws IOException {
+ state = state.start();
+ if (DEBUG) LOG.debug(out.getPos() + ": start");
+ out.write(MAGIC);
+ }
+
+ /**
+ * start a block
+ * @param recordCount the record count in this block
+ * @throws java.io.IOException
+ */
+ public void startBlock(long recordCount) throws IOException {
+ state = state.startBlock();
+ if (DEBUG) LOG.debug(out.getPos() + ": start block");
+// out.write(MAGIC); // TODO: add a magic delimiter
+ currentBlock = new BlockMetaData();
+ currentRecordCount = recordCount;
+ }
+
+ /**
+ * start a column inside a block
+ * @param descriptor the column descriptor
+ * @param valueCount the value count in this column
+ * @param statistics the statistics in this column
+ * @param compressionCodecName
+ * @throws java.io.IOException
+ */
+ public void startColumn(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName) throws IOException {
+ state = state.startColumn();
+ if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
+ currentEncodings = new HashSet<parquet.column.Encoding>();
+ currentChunkPath = ColumnPath.get(descriptor.getPath());
+ currentChunkType = descriptor.getType();
+ currentChunkCodec = compressionCodecName;
+ currentChunkValueCount = valueCount;
+ currentChunkFirstDataPage = out.getPos();
+ compressedLength = 0;
+ uncompressedLength = 0;
+ // need to know what type of stats to initialize to
+ // better way to do this?
+ currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
+ }
+
+ /**
+ * writes a dictionary page page
+ * @param dictionaryPage the dictionary page
+ */
+ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+ state = state.write();
+ if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
+ currentChunkDictionaryPageOffset = out.getPos();
+ int uncompressedSize = dictionaryPage.getUncompressedSize();
+ int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ out);
+ long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+ this.uncompressedLength += uncompressedSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
+ dictionaryPage.getBytes().writeAllTo(out);
+ currentEncodings.add(dictionaryPage.getEncoding());
+ }
+
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ */
+ @Deprecated
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ parquet.column.Encoding rlEncoding,
+ parquet.column.Encoding dlEncoding,
+ parquet.column.Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+ int compressedPageSize = (int)bytes.size();
+ metadataConverter.writeDataPageHeader(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+ bytes.writeAllTo(out);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ }
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ */
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ parquet.column.Encoding rlEncoding,
+ parquet.column.Encoding dlEncoding,
+ parquet.column.Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+ int compressedPageSize = (int)bytes.size();
+ metadataConverter.writeDataPageHeader(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+ bytes.writeAllTo(out);
+ currentStatistics.mergeStatistics(statistics);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ }
+
+ /**
+ * writes a number of pages at once
+ * @param bytes bytes to be written including page headers
+ * @param uncompressedTotalPageSize total uncompressed size (without page headers)
+ * @param compressedTotalPageSize total compressed size (without page headers)
+ * @throws java.io.IOException
+ */
+ void writeDataPages(BytesInput bytes,
+ long uncompressedTotalPageSize,
+ long compressedTotalPageSize,
+ Statistics totalStats,
+ List<parquet.column.Encoding> encodings) throws IOException {
+ state = state.write();
+ if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
+ long headersSize = bytes.size() - compressedTotalPageSize;
+ this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+ this.compressedLength += compressedTotalPageSize + headersSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
+ bytes.writeAllTo(out);
+ currentEncodings.addAll(encodings);
+ currentStatistics = totalStats;
+ }
+
+ /**
+ * end a column (once all rep, def and data have been written)
+ * @throws java.io.IOException
+ */
+ public void endColumn() throws IOException {
+ state = state.endColumn();
+ if (DEBUG) LOG.debug(out.getPos() + ": end column");
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ currentChunkPath,
+ currentChunkType,
+ currentChunkCodec,
+ currentEncodings,
+ currentStatistics,
+ currentChunkFirstDataPage,
+ currentChunkDictionaryPageOffset,
+ currentChunkValueCount,
+ compressedLength,
+ uncompressedLength));
+ if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
+ currentColumn = null;
+ this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+ this.uncompressedLength = 0;
+ this.compressedLength = 0;
+ }
+
+ /**
+ * ends a block once all column chunks have been written
+ * @throws java.io.IOException
+ */
+ public void endBlock() throws IOException {
+ state = state.endBlock();
+ if (DEBUG) LOG.debug(out.getPos() + ": end block");
+ currentBlock.setRowCount(currentRecordCount);
+ blocks.add(currentBlock);
+ currentBlock = null;
+ }
+
+ /**
+ * ends a file once all blocks have been written.
+ * closes the file.
+ * @param extraMetaData the extra meta data to write in the footer
+ * @throws java.io.IOException
+ */
+ public void end(Map<String, String> extraMetaData) throws IOException {
+ state = state.end();
+ if (DEBUG) LOG.debug(out.getPos() + ": end");
+ ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+ serializeFooter(footer, out);
+ out.close();
+ }
+
+ private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
+ long footerIndex = out.getPos();
+ parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
+ writeFileMetaData(parquetMetadata, out);
+ if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
+ out.write(MAGIC);
+ }
+
+ /**
+ * writes a _metadata file
+ * @param configuration the configuration to use to get the FileSystem
+ * @param outputPath the directory to write the _metadata file to
+ * @param footers the list of footers to merge
+ * @throws java.io.IOException
+ */
+ public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
+ Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+ FileSystem fs = outputPath.getFileSystem(configuration);
+ outputPath = outputPath.makeQualified(fs);
+ FSDataOutputStream metadata = fs.create(metaDataPath);
+ metadata.write(MAGIC);
+ ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+ serializeFooter(metadataFooter, metadata);
+ metadata.close();
+ }
+
+ private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+ String rootPath = root.toString();
+ GlobalMetaData fileMetaData = null;
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ String path = footer.getFile().toString();
+ if (!path.startsWith(rootPath)) {
+ throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
+ }
+ path = path.substring(rootPath.length());
+ while (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
+ for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+ block.setPath(path);
+ blocks.add(block);
+ }
+ }
+ return new ParquetMetadata(fileMetaData.merge(), blocks);
+ }
+
+ /**
+ * @return the current position in the underlying file
+ * @throws java.io.IOException
+ */
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param footers the list files footers to merge
+ * @return the global meta data for all the footers
+ */
+ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+ GlobalMetaData fileMetaData = null;
+ for (Footer footer : footers) {
+ ParquetMetadata currentMetadata = footer.getParquetMetadata();
+ fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
+ }
+ return fileMetaData;
+ }
+
+ /**
+ * Will return the result of merging toMerge into mergedMetadata
+ * @param toMerge the metadata toMerge
+ * @param mergedMetadata the reference metadata to merge into
+ * @return the result of the merge
+ */
+ static GlobalMetaData mergeInto(
+ FileMetaData toMerge,
+ GlobalMetaData mergedMetadata) {
+ MessageType schema = null;
+ Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+ Set<String> createdBy = new HashSet<String>();
+ if (mergedMetadata != null) {
+ schema = mergedMetadata.getSchema();
+ newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+ createdBy.addAll(mergedMetadata.getCreatedBy());
+ }
+ if ((schema == null && toMerge.getSchema() != null)
+ || (schema != null && !schema.equals(toMerge.getSchema()))) {
+ schema = mergeInto(toMerge.getSchema(), schema);
+ }
+ for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
+ Set<String> values = newKeyValues.get(entry.getKey());
+ if (values == null) {
+ values = new HashSet<String>();
+ newKeyValues.put(entry.getKey(), values);
+ }
+ values.add(entry.getValue());
+ }
+ createdBy.add(toMerge.getCreatedBy());
+ return new GlobalMetaData(
+ schema,
+ newKeyValues,
+ createdBy);
+ }
+
+ /**
+ * will return the result of merging toMerge into mergedSchema
+ * @param toMerge the schema to merge into mergedSchema
+ * @param mergedSchema the schema to append the fields to
+ * @return the resulting schema
+ */
+ static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+ if (mergedSchema == null) {
+ return toMerge;
+ }
+ return mergedSchema.union(toMerge);
+ }
+
+}