You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 07:00:00 UTC
[26/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
deleted file mode 100644
index 5ddc3fb..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class CSVFile {
-
- public static final byte LF = '\n';
- public static int EOF = -1;
-
- private static final Log LOG = LogFactory.getLog(CSVFile.class);
-
- public static class CSVAppender extends FileAppender {
- private final TableMeta meta;
- private final Schema schema;
- private final int columnNum;
- private final FileSystem fs;
- private FSDataOutputStream fos;
- private DataOutputStream outputStream;
- private CompressionOutputStream deflateFilter;
- private char delimiter;
- private TableStatistics stats = null;
- private Compressor compressor;
- private CompressionCodecFactory codecFactory;
- private CompressionCodec codec;
- private Path compressedPath;
- private byte[] nullChars;
- private int BUFFER_SIZE = 128 * 1024;
- private int bufferedBytes = 0;
- private long pos = 0;
- private boolean isShuffle;
-
- private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
- private SerializerDeserializer serde;
-
- public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
- super(conf, schema, meta, path);
- this.fs = path.getFileSystem(conf);
- this.meta = meta;
- this.schema = schema;
- this.delimiter = StringEscapeUtils.unescapeJava(
- this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-
- this.columnNum = schema.size();
-
- String nullCharacters = StringEscapeUtils.unescapeJava(
- this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT));
-
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- @Override
- public void init() throws IOException {
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- //determine the intermediate file type
- String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
- TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
- if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
- isShuffle = true;
- } else {
- isShuffle = false;
- }
-
- if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
- String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
- codecFactory = new CompressionCodecFactory(conf);
- codec = codecFactory.getCodecByClassName(codecName);
- compressor = CodecPool.getCompressor(codec);
- if(compressor != null) compressor.reset(); //builtin gzip is null
-
- String extension = codec.getDefaultExtension();
- compressedPath = path.suffix(extension);
-
- if (fs.exists(compressedPath)) {
- throw new AlreadyExistsStorageException(compressedPath);
- }
-
- fos = fs.create(compressedPath);
- deflateFilter = codec.createOutputStream(fos, compressor);
- outputStream = new DataOutputStream(deflateFilter);
-
- } else {
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
- fos = fs.create(path);
- outputStream = new DataOutputStream(new BufferedOutputStream(fos));
- }
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- try {
- //It will be remove, because we will add custom serde in textfile
- String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
- TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- os.reset();
- pos = fos.getPos();
- bufferedBytes = 0;
- super.init();
- }
-
-
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
- int rowBytes = 0;
-
- for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
- if(columnNum - 1 > i){
- os.write((byte) delimiter);
- rowBytes += 1;
- }
- if (isShuffle) {
- // it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, datum);
- }
- }
- os.write(LF);
- rowBytes += 1;
-
- pos += rowBytes;
- bufferedBytes += rowBytes;
- if(bufferedBytes > BUFFER_SIZE){
- flushBuffer();
- }
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- private void flushBuffer() throws IOException {
- if(os.getLength() > 0) {
- os.writeTo(outputStream);
- os.reset();
- bufferedBytes = 0;
- }
- }
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- @Override
- public void flush() throws IOException {
- flushBuffer();
- outputStream.flush();
- }
-
- @Override
- public void close() throws IOException {
-
- try {
- flush();
-
- // Statistical section
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
-
- if(deflateFilter != null) {
- deflateFilter.finish();
- deflateFilter.resetState();
- deflateFilter = null;
- }
-
- os.close();
- } finally {
- IOUtils.cleanup(LOG, fos);
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-
- public boolean isCompress() {
- return compressor != null;
- }
-
- public String getExtension() {
- return codec != null ? codec.getDefaultExtension() : "";
- }
- }
-
- public static class CSVScanner extends FileScanner implements SeekableScanner {
- public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
- factory = new CompressionCodecFactory(conf);
- codec = factory.getCodec(fragment.getPath());
- if (codec == null || codec instanceof SplittableCompressionCodec) {
- splittable = true;
- }
-
- //Delimiter
- this.delimiter = StringEscapeUtils.unescapeJava(
- meta.getOption(StorageConstants.TEXT_DELIMITER,
- meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0);
-
- String nullCharacters = StringEscapeUtils.unescapeJava(
- meta.getOption(StorageConstants.TEXT_NULL,
- meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT)));
-
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
- private char delimiter;
- private FileSystem fs;
- private FSDataInputStream fis;
- private InputStream is; //decompressd stream
- private CompressionCodecFactory factory;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private Seekable filePosition;
- private boolean splittable = false;
- private long startOffset, end, pos;
- private int currentIdx = 0, validIdx = 0, recordCount = 0;
- private int[] targetColumnIndexes;
- private boolean eof = false;
- private final byte[] nullChars;
- private SplitLineReader reader;
- private ArrayList<Long> fileOffsets;
- private ArrayList<Integer> rowLengthList;
- private ArrayList<Integer> startOffsets;
- private NonSyncByteArrayOutputStream buffer;
- private SerializerDeserializer serde;
-
- @Override
- public void init() throws IOException {
- fileOffsets = new ArrayList<Long>();
- rowLengthList = new ArrayList<Integer>();
- startOffsets = new ArrayList<Integer>();
- buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
-
- // FileFragment information
- if(fs == null) {
- fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
- }
- if(fis == null) fis = fs.open(fragment.getPath());
-
- recordCount = 0;
- pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getEndKey();
-
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- fis, decompressor, startOffset, end,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
- reader = new CompressedSplitLineReader(cIn, conf, null);
- startOffset = cIn.getAdjustedStart();
- end = cIn.getAdjustedEnd();
- filePosition = cIn;
- is = cIn;
- } else {
- is = new DataInputStream(codec.createInputStream(fis, decompressor));
- reader = new SplitLineReader(is, null);
- filePosition = fis;
- }
- } else {
- fis.seek(startOffset);
- filePosition = fis;
- is = fis;
- reader = new SplitLineReader(is, null);
- }
-
- if (targets == null) {
- targets = schema.toArray();
- }
-
- targetColumnIndexes = new int[targets.length];
- for (int i = 0; i < targets.length; i++) {
- targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
-
- try {
- //FIXME
- String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
- TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- super.init();
- Arrays.sort(targetColumnIndexes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
- "," + fs.getFileStatus(fragment.getPath()).getLen());
- }
-
- if (startOffset != 0) {
- pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
- }
- eof = false;
- page();
- }
-
- private int maxBytesToConsume(long pos) {
- return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
- }
-
- private long fragmentable() throws IOException {
- return end - getFilePosition();
- }
-
- private long getFilePosition() throws IOException {
- long retVal;
- if (isCompress()) {
- retVal = filePosition.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
-
- private void page() throws IOException {
-// // Index initialization
- currentIdx = 0;
- validIdx = 0;
- int currentBufferPos = 0;
- int bufferedSize = 0;
-
- buffer.reset();
- startOffsets.clear();
- rowLengthList.clear();
- fileOffsets.clear();
-
- if(eof) {
- return;
- }
-
- while (DEFAULT_PAGE_SIZE >= bufferedSize){
-
- int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
-
- if(ret == 0){
- break;
- } else {
- fileOffsets.add(pos);
- pos += ret;
- startOffsets.add(currentBufferPos);
- currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
- bufferedSize += ret;
- validIdx++;
- recordCount++;
- }
-
- if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
- eof = true;
- break;
- }
- }
- if (tableStats != null) {
- tableStats.setReadBytes(pos - startOffset);
- tableStats.setNumRows(recordCount);
- }
- }
-
- @Override
- public float getProgress() {
- try {
- if(eof) {
- return 1.0f;
- }
- long filePos = getFilePosition();
- if (startOffset == filePos) {
- return 0.0f;
- } else {
- long readBytes = filePos - startOffset;
- long remainingBytes = Math.max(end - filePos, 0);
- return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return 0.0f;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- try {
- if (currentIdx == validIdx) {
- if (eof) {
- return null;
- } else {
- page();
-
- if(currentIdx == validIdx){
- return null;
- }
- }
- }
-
- long offset = -1;
- if(!isCompress()){
- offset = fileOffsets.get(currentIdx);
- }
-
- byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
- rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
- currentIdx++;
- return new LazyTuple(schema, cells, offset, nullChars, serde);
- } catch (Throwable t) {
- LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
- LOG.error("Tuple list current index: " + currentIdx, t);
- throw new IOException(t);
- }
- }
-
- private boolean isCompress() {
- return codec != null;
- }
-
- @Override
- public void reset() throws IOException {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
-
- init();
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (tableStats != null) {
- tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead)
- tableStats.setNumRows(recordCount);
- }
-
- IOUtils.cleanup(LOG, reader, is, fis);
- fs = null;
- is = null;
- fis = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner processed record:" + recordCount);
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
- }
-
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public void seek(long offset) throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
-
- if (tupleIndex > -1) {
- this.currentIdx = tupleIndex;
- } else if (isSplittable() && end >= offset || startOffset <= offset) {
- eof = false;
- fis.seek(offset);
- pos = offset;
- reader.reset();
- this.currentIdx = 0;
- this.validIdx = 0;
- // pageBuffer();
- } else {
- throw new IOException("invalid offset " +
- " < start : " + startOffset + " , " +
- " end : " + end + " , " +
- " filePos : " + filePosition.getPos() + " , " +
- " input offset : " + offset + " >");
- }
- }
-
- @Override
- public long getNextOffset() throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- if (this.currentIdx == this.validIdx) {
- if (fragmentable() <= 0) {
- return -1;
- } else {
- page();
- if(currentIdx == validIdx) return -1;
- }
- }
- return fileOffsets.get(currentIdx);
- }
-
- @Override
- public boolean isSplittable(){
- return splittable;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
deleted file mode 100644
index 4f58e68..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * Line reader for compressed splits
- *
- * Reading records from a compressed split is tricky, as the
- * LineRecordReader is using the reported compressed input stream
- * position directly to determine when a split has ended. In addition the
- * compressed input stream is usually faking the actual byte position, often
- * updating it only after the first compressed block after the split is
- * accessed.
- *
- * Depending upon where the last compressed block of the split ends relative
- * to the record delimiters it can be easy to accidentally drop the last
- * record or duplicate the last record between this split and the next.
- *
- * Split end scenarios:
- *
- * 1) Last block of split ends in the middle of a record
- * Nothing special that needs to be done here, since the compressed input
- * stream will report a position after the split end once the record
- * is fully read. The consumer of the next split will discard the
- * partial record at the start of the split normally, and no data is lost
- * or duplicated between the splits.
- *
- * 2) Last block of split ends in the middle of a delimiter
- * The line reader will continue to consume bytes into the next block to
- * locate the end of the delimiter. If a custom delimiter is being used
- * then the next record must be read by this split or it will be dropped.
- * The consumer of the next split will not recognize the partial
- * delimiter at the beginning of its split and will discard it along with
- * the next record.
- *
- * However for the default delimiter processing there is a special case
- * because CR, LF, and CRLF are all valid record delimiters. If the
- * block ends with a CR then the reader must peek at the next byte to see
- * if it is an LF and therefore part of the same record delimiter.
- * Peeking at the next byte is an access to the next block and triggers
- * the stream to report the end of the split. There are two cases based
- * on the next byte:
- *
- * A) The next byte is LF
- * The split needs to end after the current record is returned. The
- * consumer of the next split will discard the first record, which
- * is degenerate since LF is itself a delimiter, and start consuming
- * records after that byte. If the current split tries to read
- * another record then the record will be duplicated between splits.
- *
- * B) The next byte is not LF
- * The current record will be returned but the stream will report
- * the split has ended due to the peek into the next block. If the
- * next record is not read then it will be lost, as the consumer of
- * the next split will discard it before processing subsequent
- * records. Therefore the next record beyond the reported split end
- * must be consumed by this split to avoid data loss.
- *
- * 3) Last block of split ends at the beginning of a delimiter
- * This is equivalent to case 1, as the reader will consume bytes into
- * the next block and trigger the end of the split. No further records
- * should be read as the consumer of the next split will discard the
- * (degenerate) record at the beginning of its split.
- *
- * 4) Last block of split ends at the end of a delimiter
- * Nothing special needs to be done here. The reader will not start
- * examining the bytes into the next block until the next record is read,
- * so the stream will not report the end of the split just yet. Once the
- * next record is read then the next block will be accessed and the
- * stream will indicate the end of the split. The consumer of the next
- * split will correctly discard the first record of its split, and no
- * data is lost or duplicated.
- *
- * If the default delimiter is used and the block ends at a CR then this
- * is treated as case 2 since the reader does not yet know without
- * looking at subsequent bytes whether the delimiter has ended.
- *
- * NOTE: It is assumed that compressed input streams *never* return bytes from
- * multiple compressed blocks from a single read. Failure to do so will
- * violate the buffering performed by this class, as it will access
- * bytes into the next block after the split before returning all of the
- * records from the previous block.
- */
-
-public class CompressedSplitLineReader extends SplitLineReader {
- SplitCompressionInputStream scin;
- private boolean usingCRLF;
- private boolean needAdditionalRecord = false;
- private boolean finished = false;
-
- public CompressedSplitLineReader(SplitCompressionInputStream in,
- Configuration conf,
- byte[] recordDelimiterBytes)
- throws IOException {
- super(in, conf, recordDelimiterBytes);
- scin = in;
- usingCRLF = (recordDelimiterBytes == null);
- }
-
- @Override
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException {
- int bytesRead = in.read(buffer);
-
- // If the split ended in the middle of a record delimiter then we need
- // to read one additional record, as the consumer of the next split will
- // not recognize the partial delimiter as a record.
- // However if using the default delimiter and the next character is a
- // linefeed then next split will treat it as a delimiter all by itself
- // and the additional record read should not be performed.
- if (inDelimiter && bytesRead > 0) {
- if (usingCRLF) {
- needAdditionalRecord = (buffer[0] != '\n');
- } else {
- needAdditionalRecord = true;
- }
- }
- return bytesRead;
- }
-
- @Override
- public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
- , int maxBytesToConsume) throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public boolean needAdditionalRecordAfterSplit() {
- return !finished && needAdditionalRecord;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
deleted file mode 100644
index 8841a31..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DataLocation {
- private String host;
- private int volumeId;
-
- public DataLocation(String host, int volumeId) {
- this.host = host;
- this.volumeId = volumeId;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getVolumeId() {
- return volumeId;
- }
-
- @Override
- public String toString() {
- return "DataLocation{" +
- "host=" + host +
- ", volumeId=" + volumeId +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
deleted file mode 100644
index 2396349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
- private int id;
- private String name;
-
- private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
- public DiskDeviceInfo(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return id + "," + name;
- }
-
- public void addMountPath(DiskMountInfo diskMountInfo) {
- mountInfos.add(diskMountInfo);
- }
-
- public List<DiskMountInfo> getMountInfos() {
- return mountInfos;
- }
-
- public void setMountInfos(List<DiskMountInfo> mountInfos) {
- this.mountInfos = mountInfos;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
deleted file mode 100644
index 22f18ba..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DiskInfo {
- private int id;
- private String partitionName;
- private String mountPath;
-
- private long capacity;
- private long used;
-
- public DiskInfo(int id, String partitionName) {
- this.id = id;
- this.partitionName = partitionName;
- }
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getPartitionName() {
- return partitionName;
- }
-
- public void setPartitionName(String partitionName) {
- this.partitionName = partitionName;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
deleted file mode 100644
index aadb0e7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
- private String mountPath;
-
- private long capacity;
- private long used;
-
- private int deviceId;
-
- public DiskMountInfo(int deviceId, String mountPath) {
- this.mountPath = mountPath;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
- public int getDeviceId() {
- return deviceId;
- }
-
- @Override
- public boolean equals(Object obj){
- if (!(obj instanceof DiskMountInfo)) return false;
-
- if (compareTo((DiskMountInfo) obj) == 0) return true;
- else return false;
- }
-
- @Override
- public int hashCode(){
- return Objects.hashCode(mountPath);
- }
-
- @Override
- public int compareTo(DiskMountInfo other) {
- String path1 = mountPath;
- String path2 = other.mountPath;
-
- int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
- int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-
- if(path1Depth > path2Depth) {
- return -1;
- } else if(path1Depth < path2Depth) {
- return 1;
- } else {
- int path1Length = path1.length();
- int path2Length = path2.length();
-
- if(path1Length < path2Length) {
- return 1;
- } else if(path1Length > path2Length) {
- return -1;
- } else {
- return path1.compareTo(path2);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
deleted file mode 100644
index 2d68870..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Util;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-
-public class DiskUtil {
-
- static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
- public enum OSType {
- OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
- }
-
- static private OSType getOSType() {
- String osName = System.getProperty("os.name");
- if (osName.contains("Windows")
- && (osName.contains("XP") || osName.contains("2003")
- || osName.contains("Vista")
- || osName.contains("Windows_7")
- || osName.contains("Windows 7") || osName
- .contains("Windows7"))) {
- return OSType.OS_TYPE_WINXP;
- } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
- return OSType.OS_TYPE_SOLARIS;
- } else if (osName.contains("Mac")) {
- return OSType.OS_TYPE_MAC;
- } else {
- return OSType.OS_TYPE_UNIX;
- }
- }
-
- public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
- List<DiskDeviceInfo> deviceInfos;
-
- if(getOSType() == OSType.OS_TYPE_UNIX) {
- deviceInfos = getUnixDiskDeviceInfos();
- setDeviceMountInfo(deviceInfos);
- } else {
- deviceInfos = getDefaultDiskDeviceInfos();
- }
-
- return deviceInfos;
- }
-
- private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- File file = new File(UNIX_DISK_DEVICE_PATH);
- if(!file.exists()) {
- System.out.println("No partition file:" + file.getAbsolutePath());
- return getDefaultDiskDeviceInfos();
- }
-
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
- String line = null;
-
- int count = 0;
- Set<String> deviceNames = new TreeSet<String>();
- while((line = reader.readLine()) != null) {
- if(count > 0 && !line.trim().isEmpty()) {
- String[] tokens = line.trim().split(" +");
- if(tokens.length == 4) {
- String deviceName = getDiskDeviceName(tokens[3]);
- deviceNames.add(deviceName);
- }
- }
- count++;
- }
-
- int id = 0;
- for(String eachDeviceName: deviceNames) {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
- diskDeviceInfo.setName(eachDeviceName);
-
- //TODO set addtional info
- // /sys/block/sda/queue
- infos.add(diskDeviceInfo);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
-
- return infos;
- }
-
- private static String getDiskDeviceName(String partitionName) {
- byte[] bytes = partitionName.getBytes();
-
- byte[] result = new byte[bytes.length];
- int length = 0;
- for(int i = 0; i < bytes.length; i++, length++) {
- if(bytes[i] >= '0' && bytes[i] <= '9') {
- break;
- } else {
- result[i] = bytes[i];
- }
- }
-
- return new String(result, 0, length);
- }
-
- public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
- diskDeviceInfo.setName("default");
-
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- infos.add(diskDeviceInfo);
-
- return infos;
- }
-
-
- private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
- Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
- for(DiskDeviceInfo eachDevice: deviceInfos) {
- deviceMap.put(eachDevice.getName(), eachDevice);
- }
-
- BufferedReader mountOutput = null;
- try {
- Process mountProcess = Runtime.getRuntime().exec("mount");
- mountOutput = new BufferedReader(new InputStreamReader(
- mountProcess.getInputStream()));
- while (true) {
- String line = mountOutput.readLine();
- if (line == null) {
- break;
- }
-
- int indexStart = line.indexOf(" on /");
- int indexEnd = line.indexOf(" ", indexStart + 4);
-
- String deviceName = line.substring(0, indexStart).trim();
- String[] deviceNameTokens = deviceName.split("/");
- if(deviceNameTokens.length == 3) {
- if("dev".equals(deviceNameTokens[1])) {
- String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
- String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-
- DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
- if(diskDeviceInfo != null) {
- diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
- }
- }
- }
- }
- } catch (IOException e) {
- throw e;
- } finally {
- if (mountOutput != null) {
- mountOutput.close();
- }
- }
- }
-
- public static int getDataNodeStorageSize(){
- return getStorageDirs().size();
- }
-
- public static List<URI> getStorageDirs(){
- Configuration conf = new HdfsConfiguration();
- Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
- return Util.stringCollectionAsURIs(dirNames);
- }
-
- public static void main(String[] args) throws Exception {
- System.out.println("/dev/sde1".split("/").length);
- for(String eachToken: "/dev/sde1".split("/")) {
- System.out.println(eachToken);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
deleted file mode 100644
index 0b3755d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.text.TextLineParsingError;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-
-public interface FieldSerializerDeserializer {
-
- public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
-
- public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars)
- throws IOException, TextLineParsingError;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
deleted file mode 100644
index 04278e9..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-
-import java.io.IOException;
-
-public abstract class FileAppender implements Appender {
- protected boolean inited = false;
-
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final Path path;
-
- protected boolean enabledStats;
-
- public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.path = path;
- }
-
- public void init() throws IOException {
- if (inited) {
- throw new IllegalStateException("FileAppender is already initialized.");
- }
- inited = true;
- }
-
- public void enableStats() {
- if (inited) {
- throw new IllegalStateException("Should enable this option before init()");
- }
-
- this.enabledStats = true;
- }
-
- public long getEstimatedOutputSize() throws IOException {
- return getOffset();
- }
-
- public abstract long getOffset() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
deleted file mode 100644
index f15c4c9..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public abstract class FileScanner implements Scanner {
- private static final Log LOG = LogFactory.getLog(FileScanner.class);
-
- protected boolean inited = false;
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final FileFragment fragment;
- protected final int columnNum;
-
- protected Column [] targets;
-
- protected float progress;
-
- protected TableStats tableStats;
-
- public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.fragment = fragment;
- this.tableStats = new TableStats();
- this.columnNum = this.schema.size();
- }
-
- public void init() throws IOException {
- inited = true;
- progress = 0.0f;
-
- if (fragment != null) {
- tableStats.setNumBytes(fragment.getEndKey());
- tableStats.setNumBlocks(1);
- }
-
- if (schema != null) {
- for(Column eachColumn: schema.getColumns()) {
- ColumnStats columnStats = new ColumnStats(eachColumn);
- tableStats.addColumnStat(columnStats);
- }
- }
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- this.targets = targets;
- }
-
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
- public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
- String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
- FileSystem fs;
- if(tajoUser != null) {
- try {
- fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
- } catch (InterruptedException e) {
- LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
- } else {
- fs = FileSystem.get(path.toUri(), tajoConf);
- }
-
- return fs;
- }
-
- @Override
- public float getProgress() {
- return progress;
- }
-
- @Override
- public TableStats getInputStats() {
- return tableStats;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
deleted file mode 100644
index 8b7e2e0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-/**
- * An instance of FrameTuple is an immutable tuple.
- * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
- */
-public class FrameTuple implements Tuple, Cloneable {
- private int size;
- private int leftSize;
-
- private Tuple left;
- private Tuple right;
-
- public FrameTuple() {}
-
- public FrameTuple(Tuple left, Tuple right) {
- set(left, right);
- }
-
- public void set(Tuple left, Tuple right) {
- this.size = left.size() + right.size();
- this.left = left;
- this.leftSize = left.size();
- this.right = right;
- }
-
- @Override
- public int size() {
- return size;
- }
-
- @Override
- public boolean contains(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.contains(fieldId);
- } else {
- return right.contains(fieldId - leftSize);
- }
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
- }
-
- @Override
- public void clear() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException();
- }
-
- @Override
- public void setOffset(long offset) {
- throw new UnsupportedException();
- }
-
- @Override
- public long getOffset() {
- throw new UnsupportedException();
- }
-
- @Override
- public void put(Datum [] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public Datum get(int fieldId) {
- Preconditions.checkArgument(fieldId < size,
- "Out of field access: " + fieldId);
-
- if (fieldId < leftSize) {
- return left.get(fieldId);
- } else {
- return right.get(fieldId - leftSize);
- }
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return get(fieldId).asBool();
- }
-
- @Override
- public byte getByte(int fieldId) {
- return get(fieldId).asByte();
- }
-
- @Override
- public char getChar(int fieldId) {
- return get(fieldId).asChar();
- }
-
- @Override
- public byte [] getBytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return get(fieldId).asInt2();
- }
-
- @Override
- public int getInt4(int fieldId) {
- return get(fieldId).asInt4();
- }
-
- @Override
- public long getInt8(int fieldId) {
- return get(fieldId).asInt8();
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return get(fieldId).asFloat4();
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return get(fieldId).asFloat8();
- }
-
- @Override
- public String getText(int fieldId) {
- return get(fieldId).asChars();
- }
-
- @Override
- public ProtobufDatum getProtobufDatum(int fieldId) {
- return (ProtobufDatum) get(fieldId);
- }
-
- @Override
- public IntervalDatum getInterval(int fieldId) {
- return (IntervalDatum) get(fieldId);
- }
-
- @Override
- public char [] getUnicodeChars(int fieldId) {
- return get(fieldId).asUnicodeChars();
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- FrameTuple frameTuple = (FrameTuple) super.clone();
- frameTuple.set(this.left.clone(), this.right.clone());
- return frameTuple;
- }
-
- @Override
- public Datum[] getValues(){
- throw new UnsupportedException();
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- for(int i=0; i < size(); i++) {
- if(contains(i)) {
- if(first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(get(i));
- }
- }
- str.append(")");
- return str.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
deleted file mode 100644
index 40cad32..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class HashShuffleAppender implements Appender {
- private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
-
- private FileAppender appender;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private int partId;
-
- private TableStats tableStats;
-
- //<taskId,<page start offset,<task start, task end>>>
- private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
-
- //page start offset, length
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
- private Pair<Long, Integer> currentPage;
-
- private int pageSize; //MB
-
- private int rowNumInPage;
-
- private int totalRows;
-
- private long offset;
-
- private ExecutionBlockId ebId;
-
- public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
- this.ebId = ebId;
- this.partId = partId;
- this.appender = appender;
- this.pageSize = pageSize;
- }
-
- @Override
- public void init() throws IOException {
- currentPage = new Pair(0L, 0);
- taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
- rowNumInPage = 0;
- }
-
- /**
- * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
- * After writing if a current page exceeds pageSize, pageOffset will be added.
- * @param taskId
- * @param tuples
- * @return written bytes
- * @throws IOException
- */
- public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return 0;
- }
- long currentPos = appender.getOffset();
-
- for (Tuple eachTuple: tuples) {
- appender.addTuple(eachTuple);
- }
- long posAfterWritten = appender.getOffset();
-
- int writtenBytes = (int)(posAfterWritten - currentPos);
-
- int nextRowNum = rowNumInPage + tuples.size();
- List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
- if (taskIndexes == null) {
- taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- taskTupleIndexes.put(taskId, taskIndexes);
- }
- taskIndexes.add(
- new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
- rowNumInPage = nextRowNum;
-
- if (posAfterWritten - currentPage.getFirst() > pageSize) {
- nextPage(posAfterWritten);
- rowNumInPage = 0;
- }
-
- totalRows += tuples.size();
- return writtenBytes;
- }
- }
-
- public long getOffset() throws IOException {
- if (closed.get()) {
- return offset;
- } else {
- return appender.getOffset();
- }
- }
-
- private void nextPage(long pos) {
- currentPage.setSecond((int) (pos - currentPage.getFirst()));
- pages.add(currentPage);
- currentPage = new Pair(pos, 0);
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
- throw new IOException("Not support addTuple, use addTuples()");
- }
-
- @Override
- public void flush() throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return;
- }
- appender.flush();
- }
- }
-
- @Override
- public long getEstimatedOutputSize() throws IOException {
- return pageSize * pages.size();
- }
-
- @Override
- public void close() throws IOException {
- synchronized(appender) {
- if (closed.get()) {
- return;
- }
- appender.flush();
- offset = appender.getOffset();
- if (offset > currentPage.getFirst()) {
- nextPage(offset);
- }
- appender.close();
- if (LOG.isDebugEnabled()) {
- if (!pages.isEmpty()) {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
- + ", lastPage=" + pages.get(pages.size() - 1));
- } else {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
- }
- }
- closed.set(true);
- tableStats = appender.getStats();
- }
- }
-
- @Override
- public void enableStats() {
- }
-
- @Override
- public TableStats getStats() {
- synchronized(appender) {
- return appender.getStats();
- }
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
-
- public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
- return taskTupleIndexes;
- }
-
- public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
- List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-
- for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
- merged.addAll(eachFailureIndex);
- }
-
- return merged;
- }
-
- public void taskFinished(QueryUnitAttemptId taskId) {
- taskTupleIndexes.remove(taskId);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
deleted file mode 100644
index 84d81d5..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.storage.StorageManager;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class HashShuffleAppenderManager {
- private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
-
- private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
- new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
- private TajoConf systemConf;
- private FileSystem defaultFS;
- private FileSystem localFS;
- private LocalDirAllocator lDirAllocator;
- private int pageSize;
-
- public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
- this.systemConf = systemConf;
-
- // initialize LocalDirAllocator
- lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
- // initialize DFS and LocalFileSystems
- defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
- localFS = FileSystem.getLocal(systemConf);
- pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
- }
-
- public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
- TableMeta meta, Schema outSchema) throws IOException {
- synchronized (appenderMap) {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
-
- if (partitionAppenderMap == null) {
- partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>();
- appenderMap.put(ebId, partitionAppenderMap);
- }
-
- PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
- if (partitionAppenderMeta == null) {
- Path dataFile = getDataFile(ebId, partId);
- FileSystem fs = dataFile.getFileSystem(systemConf);
- if (fs.exists(dataFile)) {
- FileStatus status = fs.getFileStatus(dataFile);
- LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
- }
-
- if (!fs.exists(dataFile.getParent())) {
- fs.mkdirs(dataFile.getParent());
- }
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(
- tajoConf).getAppender(meta, outSchema, dataFile);
- appender.enableStats();
- appender.init();
-
- partitionAppenderMeta = new PartitionAppenderMeta();
- partitionAppenderMeta.partId = partId;
- partitionAppenderMeta.dataFile = dataFile;
- partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
- partitionAppenderMeta.appender.init();
- partitionAppenderMap.put(partId, partitionAppenderMeta);
-
- LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
- }
-
- return partitionAppenderMeta.appender;
- }
- }
-
- public static int getPartParentId(int partId, TajoConf tajoConf) {
- return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
- }
-
- private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
- try {
- // the base dir for an output dir
- String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
- Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf));
- //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
-
- // If EB has many partition, too many shuffle file are in single directory.
- return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
- }
-
- public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
- synchronized (appenderMap) {
- partitionAppenderMap = appenderMap.remove(ebId);
- }
-
- if (partitionAppenderMap == null) {
- LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
- return null;
- }
-
- // Send Intermediate data to QueryMaster.
- List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
- for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
- try {
- eachMeta.appender.close();
- HashShuffleIntermediate intermediate =
- new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
- eachMeta.appender.getPages(),
- eachMeta.appender.getMergedTupleIndexes());
- intermEntries.add(intermediate);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- throw e;
- }
- }
-
- LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
-
- return intermEntries;
- }
-
- public void finalizeTask(QueryUnitAttemptId taskId) {
- synchronized (appenderMap) {
- Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
- appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
- if (partitionAppenderMap == null) {
- return;
- }
-
- for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
- eachAppender.appender.taskFinished(taskId);
- }
- }
- }
-
- public static class HashShuffleIntermediate {
- private int partId;
-
- private long volume;
-
- //[<page start offset,<task start, task end>>]
- private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
-
- //[<page start offset, length>]
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
- public HashShuffleIntermediate(int partId, long volume,
- List<Pair<Long, Integer>> pages,
- Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
- this.partId = partId;
- this.volume = volume;
- this.failureTskTupleIndexes = failureTskTupleIndexes;
- this.pages = pages;
- }
-
- public int getPartId() {
- return partId;
- }
-
- public long getVolume() {
- return volume;
- }
-
- public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
- return failureTskTupleIndexes;
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
- }
-
- static class PartitionAppenderMeta {
- int partId;
- HashShuffleAppender appender;
- Path dataFile;
-
- public int getPartId() {
- return partId;
- }
-
- public HashShuffleAppender getAppender() {
- return appender;
- }
-
- public Path getDataFile() {
- return dataFile;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
deleted file mode 100644
index bfbe478..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-import java.util.Arrays;
-
-public class LazyTuple implements Tuple, Cloneable {
- private long offset;
- private Datum[] values;
- private byte[][] textBytes;
- private Schema schema;
- private byte[] nullBytes;
- private SerializerDeserializer serializeDeserialize;
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
- this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
- }
-
- public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
- this.schema = schema;
- this.textBytes = textBytes;
- this.values = new Datum[schema.size()];
- this.offset = offset;
- this.nullBytes = nullBytes;
- this.serializeDeserialize = serde;
- }
-
- public LazyTuple(LazyTuple tuple) {
- this.values = tuple.getValues();
- this.offset = tuple.offset;
- this.schema = tuple.schema;
- this.textBytes = new byte[size()][];
- this.nullBytes = tuple.nullBytes;
- this.serializeDeserialize = tuple.serializeDeserialize;
- }
-
- @Override
- public int size() {
- return values.length;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return textBytes[fieldid] != null || values[fieldid] != null;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
- }
-
- @Override
- public void clear() {
- for (int i = 0; i < values.length; i++) {
- values[i] = null;
- textBytes[i] = null;
- }
- }
-
- //////////////////////////////////////////////////////
- // Setter
- //////////////////////////////////////////////////////
- @Override
- public void put(int fieldId, Datum value) {
- values[fieldId] = value;
- textBytes[fieldId] = null;
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- for (int i = fieldId, j = 0; j < values.length; i++, j++) {
- this.values[i] = values[j];
- }
- this.textBytes = new byte[values.length][];
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
- values[i] = tuple.get(j);
- textBytes[i] = null;
- }
- }
-
- @Override
- public void put(Datum[] values) {
- System.arraycopy(values, 0, this.values, 0, size());
- this.textBytes = new byte[values.length][];
- }
-
- //////////////////////////////////////////////////////
- // Getter
- //////////////////////////////////////////////////////
- @Override
- public Datum get(int fieldId) {
- if (values[fieldId] != null)
- return values[fieldId];
- else if (textBytes.length <= fieldId) {
- values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
- } else if (textBytes[fieldId] != null) {
- try {
- values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
- textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
- } catch (Exception e) {
- values[fieldId] = NullDatum.get();
- }
- textBytes[fieldId] = null;
- } else {
- //non-projection
- }
- return values[fieldId];
- }
-
- @Override
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- @Override
- public long getOffset() {
- return this.offset;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return get(fieldId).asBool();
- }
-
- @Override
- public byte getByte(int fieldId) {
- return get(fieldId).asByte();
- }
-
- @Override
- public char getChar(int fieldId) {
- return get(fieldId).asChar();
- }
-
- @Override
- public byte [] getBytes(int fieldId) {
- return get(fieldId).asByteArray();
- }
-
- @Override
- public short getInt2(int fieldId) {
- return get(fieldId).asInt2();
- }
-
- @Override
- public int getInt4(int fieldId) {
- return get(fieldId).asInt4();
- }
-
- @Override
- public long getInt8(int fieldId) {
- return get(fieldId).asInt8();
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return get(fieldId).asFloat4();
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return get(fieldId).asFloat8();
- }
-
- @Override
- public String getText(int fieldId) {
- return get(fieldId).asChars();
- }
-
- @Override
- public ProtobufDatum getProtobufDatum(int fieldId) {
- throw new UnsupportedException();
- }
-
- @Override
- public IntervalDatum getInterval(int fieldId) {
- return (IntervalDatum) get(fieldId);
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- return get(fieldId).asUnicodeChars();
- }
-
- public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- Datum d;
- for (int i = 0; i < values.length; i++) {
- d = get(i);
- if (d != null) {
- if (first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(d);
- }
- }
- str.append(")");
- return str.toString();
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(values);
- }
-
- @Override
- public Datum[] getValues() {
- Datum[] datums = new Datum[values.length];
- for (int i = 0; i < values.length; i++) {
- datums[i] = get(i);
- }
- return datums;
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- LazyTuple lazyTuple = (LazyTuple) super.clone();
-
- lazyTuple.values = getValues(); //shallow copy
- lazyTuple.textBytes = new byte[size()][];
- return lazyTuple;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Tuple) {
- Tuple other = (Tuple) obj;
- return Arrays.equals(getValues(), other.getValues());
- }
- return false;
- }
-}