You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:24 UTC
[10/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
new file mode 100644
index 0000000..532d9a2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.String.format;
+import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.DEBUG;
+import static parquet.Preconditions.checkNotNull;
+
+class InternalParquetRecordWriter<T> {
+ private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
+
+ private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+ private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+
+ private final ParquetFileWriter w;
+ private final WriteSupport<T> writeSupport;
+ private final MessageType schema;
+ private final Map<String, String> extraMetaData;
+ private final int blockSize;
+ private final int pageSize;
+ private final BytesCompressor compressor;
+ private final int dictionaryPageSize;
+ private final boolean enableDictionary;
+ private final boolean validating;
+ private final WriterVersion writerVersion;
+
+ private long recordCount = 0;
+ private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+ private ColumnWriteStoreImpl store;
+ private ColumnChunkPageWriteStore pageStore;
+
+ /**
+ * @param w the file to write to
+ * @param writeSupport the class to convert incoming records
+ * @param schema the schema of the records
+ * @param extraMetaData extra meta data to write in the footer of the file
+ * @param blockSize the size of a block in the file (this will be approximate)
+ * @param codec the codec used to compress
+ */
+ public InternalParquetRecordWriter(
+ ParquetFileWriter w,
+ WriteSupport<T> writeSupport,
+ MessageType schema,
+ Map<String, String> extraMetaData,
+ int blockSize,
+ int pageSize,
+ BytesCompressor compressor,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ WriterVersion writerVersion) {
+ this.w = w;
+ this.writeSupport = checkNotNull(writeSupport, "writeSupport");
+ this.schema = schema;
+ this.extraMetaData = extraMetaData;
+ this.blockSize = blockSize;
+ this.pageSize = pageSize;
+ this.compressor = compressor;
+ this.dictionaryPageSize = dictionaryPageSize;
+ this.enableDictionary = enableDictionary;
+ this.validating = validating;
+ this.writerVersion = writerVersion;
+ initStore();
+ }
+
+ private void initStore() {
+ // we don't want this number to be too small
+ // ideally we divide the block equally across the columns
+ // it is unlikely all columns are going to be the same size.
+ int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
+ pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
+ // we don't want this number to be too small either
+ // ideally, slightly bigger than the page size, but not bigger than the block buffer
+ int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
+ store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
+ MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
+ writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
+ }
+
+ public void close() throws IOException, InterruptedException {
+ flushStore();
+ w.end(extraMetaData);
+ }
+
+ public void write(T value) throws IOException, InterruptedException {
+ writeSupport.write(value);
+ ++ recordCount;
+ checkBlockSizeReached();
+ }
+
+ private void checkBlockSizeReached() throws IOException {
+ if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
+ long memSize = store.memSize();
+ if (memSize > blockSize) {
+ LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
+ flushStore();
+ initStore();
+ recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+ } else {
+ float recordSize = (float) memSize / recordCount;
+ recordCountForNextMemCheck = min(
+ max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
+ recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
+ );
+ if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
+ }
+ }
+ }
+
+ public long getEstimatedWrittenSize() throws IOException {
+ return w.getPos() + store.memSize();
+ }
+
+ private void flushStore()
+ throws IOException {
+ LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
+ if (store.allocatedSize() > 3 * blockSize) {
+ LOG.warn("Too much memory used: " + store.memUsageString());
+ }
+ w.startBlock(recordCount);
+ store.flush();
+ pageStore.flushToFileWriter(w);
+ recordCount = 0;
+ w.endBlock();
+ store = null;
+ pageStore = null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
new file mode 100644
index 0000000..ac1c421
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.Version;
+import parquet.bytes.BytesInput;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.DictionaryPage;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.metadata.*;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetFileWriter {
+ private static final Log LOG = Log.getLog(ParquetFileWriter.class);
+
+ public static final String PARQUET_METADATA_FILE = "_metadata";
+ public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+ public static final int CURRENT_VERSION = 1;
+
+ private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+
+ private final MessageType schema;
+ private final FSDataOutputStream out;
+ private BlockMetaData currentBlock;
+ private ColumnChunkMetaData currentColumn;
+ private long currentRecordCount;
+ private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ private long uncompressedLength;
+ private long compressedLength;
+ private Set<parquet.column.Encoding> currentEncodings;
+
+ private CompressionCodecName currentChunkCodec;
+ private ColumnPath currentChunkPath;
+ private PrimitiveTypeName currentChunkType;
+ private long currentChunkFirstDataPage;
+ private long currentChunkDictionaryPageOffset;
+ private long currentChunkValueCount;
+
+ private Statistics currentStatistics;
+
+ /**
+ * Captures the order in which methods should be called
+ *
+ * @author Julien Le Dem
+ *
+ */
+ private enum STATE {
+ NOT_STARTED {
+ STATE start() {
+ return STARTED;
+ }
+ },
+ STARTED {
+ STATE startBlock() {
+ return BLOCK;
+ }
+ STATE end() {
+ return ENDED;
+ }
+ },
+ BLOCK {
+ STATE startColumn() {
+ return COLUMN;
+ }
+ STATE endBlock() {
+ return STARTED;
+ }
+ },
+ COLUMN {
+ STATE endColumn() {
+ return BLOCK;
+ };
+ STATE write() {
+ return this;
+ }
+ },
+ ENDED;
+
+ STATE start() throws IOException { return error(); }
+ STATE startBlock() throws IOException { return error(); }
+ STATE startColumn() throws IOException { return error(); }
+ STATE write() throws IOException { return error(); }
+ STATE endColumn() throws IOException { return error(); }
+ STATE endBlock() throws IOException { return error(); }
+ STATE end() throws IOException { return error(); }
+
+ private final STATE error() throws IOException {
+ throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+ }
+ }
+
+ private STATE state = STATE.NOT_STARTED;
+
+ /**
+ *
+ * @param configuration Configuration
+ * @param schema the schema of the data
+ * @param file the file to write to
+ * @throws java.io.IOException if the file can not be created
+ */
+ public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
+ super();
+ this.schema = schema;
+ FileSystem fs = file.getFileSystem(configuration);
+ this.out = fs.create(file, false);
+ }
+
+ /**
+ * start the file
+ * @throws java.io.IOException
+ */
+ public void start() throws IOException {
+ state = state.start();
+ if (DEBUG) LOG.debug(out.getPos() + ": start");
+ out.write(MAGIC);
+ }
+
+ /**
+ * start a block
+ * @param recordCount the record count in this block
+ * @throws java.io.IOException
+ */
+ public void startBlock(long recordCount) throws IOException {
+ state = state.startBlock();
+ if (DEBUG) LOG.debug(out.getPos() + ": start block");
+// out.write(MAGIC); // TODO: add a magic delimiter
+ currentBlock = new BlockMetaData();
+ currentRecordCount = recordCount;
+ }
+
+ /**
+ * start a column inside a block
+ * @param descriptor the column descriptor
+ * @param valueCount the value count in this column
+ * @param statistics the statistics in this column
+ * @param compressionCodecName
+ * @throws java.io.IOException
+ */
+ public void startColumn(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName) throws IOException {
+ state = state.startColumn();
+ if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
+ currentEncodings = new HashSet<parquet.column.Encoding>();
+ currentChunkPath = ColumnPath.get(descriptor.getPath());
+ currentChunkType = descriptor.getType();
+ currentChunkCodec = compressionCodecName;
+ currentChunkValueCount = valueCount;
+ currentChunkFirstDataPage = out.getPos();
+ compressedLength = 0;
+ uncompressedLength = 0;
+ // need to know what type of stats to initialize to
+ // better way to do this?
+ currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
+ }
+
+ /**
+ * writes a dictionary page page
+ * @param dictionaryPage the dictionary page
+ */
+ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+ state = state.write();
+ if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
+ currentChunkDictionaryPageOffset = out.getPos();
+ int uncompressedSize = dictionaryPage.getUncompressedSize();
+ int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ out);
+ long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+ this.uncompressedLength += uncompressedSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
+ dictionaryPage.getBytes().writeAllTo(out);
+ currentEncodings.add(dictionaryPage.getEncoding());
+ }
+
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ */
+ @Deprecated
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ parquet.column.Encoding rlEncoding,
+ parquet.column.Encoding dlEncoding,
+ parquet.column.Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+ int compressedPageSize = (int)bytes.size();
+ metadataConverter.writeDataPageHeader(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+ bytes.writeAllTo(out);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ }
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ */
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ parquet.column.Encoding rlEncoding,
+ parquet.column.Encoding dlEncoding,
+ parquet.column.Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+ int compressedPageSize = (int)bytes.size();
+ metadataConverter.writeDataPageHeader(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+ bytes.writeAllTo(out);
+ currentStatistics.mergeStatistics(statistics);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ }
+
+ /**
+ * writes a number of pages at once
+ * @param bytes bytes to be written including page headers
+ * @param uncompressedTotalPageSize total uncompressed size (without page headers)
+ * @param compressedTotalPageSize total compressed size (without page headers)
+ * @throws java.io.IOException
+ */
+ void writeDataPages(BytesInput bytes,
+ long uncompressedTotalPageSize,
+ long compressedTotalPageSize,
+ Statistics totalStats,
+ List<parquet.column.Encoding> encodings) throws IOException {
+ state = state.write();
+ if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
+ long headersSize = bytes.size() - compressedTotalPageSize;
+ this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+ this.compressedLength += compressedTotalPageSize + headersSize;
+ if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
+ bytes.writeAllTo(out);
+ currentEncodings.addAll(encodings);
+ currentStatistics = totalStats;
+ }
+
+ /**
+ * end a column (once all rep, def and data have been written)
+ * @throws java.io.IOException
+ */
+ public void endColumn() throws IOException {
+ state = state.endColumn();
+ if (DEBUG) LOG.debug(out.getPos() + ": end column");
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ currentChunkPath,
+ currentChunkType,
+ currentChunkCodec,
+ currentEncodings,
+ currentStatistics,
+ currentChunkFirstDataPage,
+ currentChunkDictionaryPageOffset,
+ currentChunkValueCount,
+ compressedLength,
+ uncompressedLength));
+ if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
+ currentColumn = null;
+ this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+ this.uncompressedLength = 0;
+ this.compressedLength = 0;
+ }
+
+ /**
+ * ends a block once all column chunks have been written
+ * @throws java.io.IOException
+ */
+ public void endBlock() throws IOException {
+ state = state.endBlock();
+ if (DEBUG) LOG.debug(out.getPos() + ": end block");
+ currentBlock.setRowCount(currentRecordCount);
+ blocks.add(currentBlock);
+ currentBlock = null;
+ }
+
+ /**
+ * ends a file once all blocks have been written.
+ * closes the file.
+ * @param extraMetaData the extra meta data to write in the footer
+ * @throws java.io.IOException
+ */
+ public void end(Map<String, String> extraMetaData) throws IOException {
+ state = state.end();
+ if (DEBUG) LOG.debug(out.getPos() + ": end");
+ ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+ serializeFooter(footer, out);
+ out.close();
+ }
+
+ private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
+ long footerIndex = out.getPos();
+ parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
+ writeFileMetaData(parquetMetadata, out);
+ if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
+ out.write(MAGIC);
+ }
+
+ /**
+ * writes a _metadata file
+ * @param configuration the configuration to use to get the FileSystem
+ * @param outputPath the directory to write the _metadata file to
+ * @param footers the list of footers to merge
+ * @throws java.io.IOException
+ */
+ public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
+ Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+ FileSystem fs = outputPath.getFileSystem(configuration);
+ outputPath = outputPath.makeQualified(fs);
+ FSDataOutputStream metadata = fs.create(metaDataPath);
+ metadata.write(MAGIC);
+ ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+ serializeFooter(metadataFooter, metadata);
+ metadata.close();
+ }
+
+ private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+ String rootPath = root.toString();
+ GlobalMetaData fileMetaData = null;
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ String path = footer.getFile().toString();
+ if (!path.startsWith(rootPath)) {
+ throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
+ }
+ path = path.substring(rootPath.length());
+ while (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
+ for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+ block.setPath(path);
+ blocks.add(block);
+ }
+ }
+ return new ParquetMetadata(fileMetaData.merge(), blocks);
+ }
+
+ /**
+ * @return the current position in the underlying file
+ * @throws java.io.IOException
+ */
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param footers the list files footers to merge
+ * @return the global meta data for all the footers
+ */
+ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+ GlobalMetaData fileMetaData = null;
+ for (Footer footer : footers) {
+ ParquetMetadata currentMetadata = footer.getParquetMetadata();
+ fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
+ }
+ return fileMetaData;
+ }
+
+ /**
+ * Will return the result of merging toMerge into mergedMetadata
+ * @param toMerge the metadata toMerge
+ * @param mergedMetadata the reference metadata to merge into
+ * @return the result of the merge
+ */
+ static GlobalMetaData mergeInto(
+ FileMetaData toMerge,
+ GlobalMetaData mergedMetadata) {
+ MessageType schema = null;
+ Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+ Set<String> createdBy = new HashSet<String>();
+ if (mergedMetadata != null) {
+ schema = mergedMetadata.getSchema();
+ newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+ createdBy.addAll(mergedMetadata.getCreatedBy());
+ }
+ if ((schema == null && toMerge.getSchema() != null)
+ || (schema != null && !schema.equals(toMerge.getSchema()))) {
+ schema = mergeInto(toMerge.getSchema(), schema);
+ }
+ for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
+ Set<String> values = newKeyValues.get(entry.getKey());
+ if (values == null) {
+ values = new HashSet<String>();
+ newKeyValues.put(entry.getKey(), values);
+ }
+ values.add(entry.getValue());
+ }
+ createdBy.add(toMerge.getCreatedBy());
+ return new GlobalMetaData(
+ schema,
+ newKeyValues,
+ createdBy);
+ }
+
+ /**
+ * will return the result of merging toMerge into mergedSchema
+ * @param toMerge the schema to merge into mergedSchema
+ * @param mergedSchema the schema to append the fields to
+ * @return the resulting schema
+ */
+ static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+ if (mergedSchema == null) {
+ return toMerge;
+ }
+ return mergedSchema.union(toMerge);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
new file mode 100644
index 0000000..9c167a0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.api.ReadSupport.ReadContext;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.GlobalMetaData;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Read records from a Parquet file.
+ */
+public class ParquetReader<T> implements Closeable {
+
+ private ReadSupport<T> readSupport;
+ private UnboundRecordFilter filter;
+ private Configuration conf;
+ private ReadContext readContext;
+ private Iterator<Footer> footersIterator;
+ private InternalParquetRecordReader<T> reader;
+ private GlobalMetaData globalMetaData;
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
+ this(file, readSupport, null);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
+ this(conf, file, readSupport, null);
+ }
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ this(new Configuration(), file, readSupport, filter);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ this.readSupport = readSupport;
+ this.filter = filter;
+ this.conf = conf;
+
+ FileSystem fs = file.getFileSystem(conf);
+ List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
+ List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+ this.footersIterator = footers.iterator();
+ globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ blocks.addAll(footer.getParquetMetadata().getBlocks());
+ }
+
+ MessageType schema = globalMetaData.getSchema();
+ Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
+ readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
+ }
+
+ /**
+ * @return the next record or null if finished
+ * @throws java.io.IOException
+ */
+ public T read() throws IOException {
+ try {
+ if (reader != null && reader.nextKeyValue()) {
+ return reader.getCurrentValue();
+ } else {
+ initReader();
+ return reader == null ? null : read();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void initReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ if (footersIterator.hasNext()) {
+ Footer footer = footersIterator.next();
+ reader = new InternalParquetRecordReader<T>(readSupport, filter);
+ reader.initialize(
+ readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+ readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
new file mode 100644
index 0000000..7527437
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ParquetProperties;
+import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ParquetWriter<T> implements Closeable {
+
+ public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+ public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+ public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
+ CompressionCodecName.UNCOMPRESSED;
+ public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+ public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+ public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
+ ParquetProperties.WriterVersion.PARQUET_1_0;
+
+ private final InternalParquetRecordWriter<T> writer;
+
+ /**
+ * Create a new ParquetWriter.
+ * (with dictionary encoding enabled and validation off)
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean)
+ */
+ public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold (both data and dictionary)
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ dictionaryPageSize, enableDictionary, validating,
+ DEFAULT_WRITER_VERSION);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
+ * configuration from the classpath.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
+ * @param conf Hadoop configuration to use while accessing the filesystem
+ * @throws java.io.IOException
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion,
+ Configuration conf) throws IOException {
+
+ WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+ MessageType schema = writeContext.getSchema();
+
+ ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
+ fileWriter.start();
+
+ CodecFactory codecFactory = new CodecFactory(conf);
+ CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
+ this.writer = new InternalParquetRecordWriter<T>(
+ fileWriter,
+ writeSupport,
+ schema,
+ writeContext.getExtraMetaData(),
+ blockSize,
+ pageSize,
+ compressor,
+ dictionaryPageSize,
+ enableDictionary,
+ validating,
+ writerVersion);
+ }
+
+ /**
+ * Create a new ParquetWriter. The default block size is 50 MB.The default
+ * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @throws java.io.IOException
+ */
+ public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
+ this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ }
+
+ public void write(T object) throws IOException {
+ try {
+ writer.write(object);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public long getEstimatedWrittenSize() throws IOException {
+ return this.writer.getEstimatedWrittenSize();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ writer.close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000..ce9aab6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message FileFragmentProto {
+ required string id = 1;
+ required string path = 2;
+ required int64 startOffset = 3;
+ required int64 length = 4;
+ repeated string hosts = 5;
+ repeated int32 diskIds = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
new file mode 100644
index 0000000..cf8a54e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpFileServer {
+ private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
+
+ private final InetSocketAddress addr;
+ private InetSocketAddress bindAddr;
+ private ServerBootstrap bootstrap = null;
+ private ChannelFactory factory = null;
+ private ChannelGroup channelGroup = null;
+
+ public HttpFileServer(final InetSocketAddress addr) {
+ this.addr = addr;
+ this.factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+ 2);
+
+ // Configure the server.
+ this.bootstrap = new ServerBootstrap(factory);
+ // Set up the event pipeline factory.
+ this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
+ this.channelGroup = new DefaultChannelGroup();
+ }
+
+ public HttpFileServer(String bindaddr) {
+ this(NetUtils.createSocketAddr(bindaddr));
+ }
+
+ public void start() {
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(addr);
+ channelGroup.add(channel);
+ this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+ LOG.info("HttpFileServer starts up ("
+ + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+ + ")");
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddr;
+ }
+
+ public void stop() {
+ ChannelGroupFuture future = channelGroup.close();
+ future.awaitUninterruptibly();
+ factory.releaseExternalResources();
+
+ LOG.info("HttpFileServer shutdown ("
+ + this.bindAddr.getAddress().getHostAddress() + ":"
+ + this.bindAddr.getPort() + ")");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
new file mode 100644
index 0000000..6c77317
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
+ */
+public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ final String path = sanitizeUri(request.getUri());
+ if (path == null) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ File file = new File(path);
+ if (file.isHidden() || !file.exists()) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ if (!file.isFile()) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException fnfe) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ long fileLength = raf.length();
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ setContentLength(response, fileLength);
+ setContentTypeHeader(response);
+
+ Channel ch = e.getChannel();
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ // Write the content.
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) != null) {
+ // Cannot use zero-copy with HTTPS.
+ writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+ } else {
+ // No encryption - use zero-copy.
+ final FileRegion region =
+ new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+ writeFuture = ch.write(region);
+ writeFuture.addListener(new ChannelFutureProgressListener() {
+ public void operationComplete(ChannelFuture future) {
+ region.releaseExternalResources();
+ }
+
+ public void operationProgressed(
+ ChannelFuture future, long amount, long current, long total) {
+ System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+ }
+ });
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ cause.printStackTrace();
+ if (ch.isConnected()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static String sanitizeUri(String uri) {
+ // Decode the path.
+ try {
+ uri = URLDecoder.decode(uri, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ try {
+ uri = URLDecoder.decode(uri, "ISO-8859-1");
+ } catch (UnsupportedEncodingException e1) {
+ throw new Error();
+ }
+ }
+
+ // Convert file separators.
+ uri = uri.replace('/', File.separatorChar);
+
+ // Simplistic dumb security check.
+ // You will have to do something serious in the production environment.
+ if (uri.contains(File.separator + '.') ||
+ uri.contains('.' + File.separator) ||
+ uri.startsWith(".") || uri.endsWith(".")) {
+ return null;
+ }
+
+ return uri;
+ }
+
+ private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(ChannelBuffers.copiedBuffer(
+ "Failure: " + status.toString() + "\r\n",
+ CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Sets the content type header for the HTTP Response
+ *
+ * @param response
+ * HTTP response
+ */
+ private static void setContentTypeHeader(HttpResponse response) {
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
new file mode 100644
index 0000000..cecf93b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+// Uncomment the following lines if you want HTTPS
+//import javax.net.ssl.SSLEngine;
+//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
+//import org.jboss.netty.handler.ssl.SslHandler;
+
+//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
+public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ // Uncomment the following lines if you want HTTPS
+ //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ //engine.setUseClientMode(false);
+ //pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+ pipeline.addLast("handler", new HttpFileServerHandler());
+ return pipeline;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
new file mode 100644
index 0000000..3c78d6b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestCompressionStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestCompressionStorages";
+
+ private StoreType storeType;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestCompressionStorages(StoreType type) throws IOException {
+ this.storeType = type;
+ conf = new TajoConf();
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {StoreType.CSV},
+ {StoreType.RCFILE},
+ {StoreType.SEQUENCEFILE},
+ {StoreType.TEXTFILE}
+ });
+ }
+
+ @Test
+ public void testDeflateCodecCompressionData() throws IOException {
+ storageCompressionTest(storeType, DeflateCodec.class);
+ }
+
+ @Test
+ public void testGzipCodecCompressionData() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ if( ZlibFactory.isNativeZlibLoaded(conf)) {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ } else if (storeType == StoreType.SEQUENCEFILE) {
+ if( ZlibFactory.isNativeZlibLoaded(conf)) {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ } else {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ }
+
+ @Test
+ public void testSnappyCodecCompressionData() throws IOException {
+ if (SnappyCodec.isNativeCodeLoaded()) {
+ storageCompressionTest(storeType, SnappyCodec.class);
+ }
+ }
+
+ @Test
+ public void testLz4CodecCompressionData() throws IOException {
+ if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+ storageCompressionTest(storeType, Lz4Codec.class);
+ }
+
+ private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.FLOAT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.putOption("compression.codec", codec.getCanonicalName());
+ meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
+ meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
+ meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
+
+ String fileName = "Compression_" + codec.getSimpleName();
+ Path tablePath = new Path(testDir, fileName);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.enableStats();
+
+ appender.init();
+
+ String extension = "";
+ if (appender instanceof CSVFile.CSVAppender) {
+ extension = ((CSVFile.CSVAppender) appender).getExtension();
+ } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
+ extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
+ }
+
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createFloat4((float) i));
+ vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ tablePath = tablePath.suffix(extension);
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment[] tablets = new FileFragment[1];
+ tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
+
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+
+ if (StoreType.CSV == storeType) {
+ if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
+ assertTrue(scanner.isSplittable());
+ } else {
+ assertFalse(scanner.isSplittable());
+ }
+ }
+ scanner.init();
+
+ if (storeType == StoreType.SEQUENCEFILE) {
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+ }
+
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+ assertEquals(tupleNum, tupleCnt);
+ assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
new file mode 100644
index 0000000..8749925
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestDelimitedTextFile {
+
+ private static Schema schema = new Schema();
+
+ private static Tuple baseTuple = new VTuple(10);
+
+ static {
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+
+ baseTuple.put(new Datum[] {
+ DatumFactory.createBool(true), // 0
+ DatumFactory.createChar("hyunsik"), // 1
+ DatumFactory.createInt2((short) 17), // 2
+ DatumFactory.createInt4(59), // 3
+ DatumFactory.createInt8(23l), // 4
+ DatumFactory.createFloat4(77.9f), // 5
+ DatumFactory.createFloat8(271.9d), // 6
+ DatumFactory.createText("hyunsik"), // 7
+ DatumFactory.createBlob("hyunsik".getBytes()),// 8
+ DatumFactory.createInet4("192.168.0.1"), // 9
+ });
+ }
+
+ public static Path getResourcePath(String path, String suffix) {
+ URL resultBaseURL = ClassLoader.getSystemResource(path);
+ return new Path(resultBaseURL.toString(), suffix);
+ }
+
+ public static Path getResultPath(Class clazz, String fileName) {
+ return new Path (getResourcePath("results", clazz.getSimpleName()), fileName);
+ }
+
+ public static String getResultText(Class clazz, String fileName) throws IOException {
+ FileSystem localFS = FileSystem.getLocal(new Configuration());
+ Path path = getResultPath(clazz, fileName);
+ Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
+ return FileUtil.readTextFile(new File(path.toUri()));
+ }
+
+ private static final FileFragment getFileFragment(String fileName) throws IOException {
+ TajoConf conf = new TajoConf();
+ Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName);
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus status = fs.getFileStatus(tablePath);
+ return new FileFragment("table", tablePath, 0, status.getLen());
+ }
+
+ @Test
+ public void testIgnoreAllErrors() throws IOException {
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
+ FileFragment fragment = getFileFragment("testErrorTolerance1.json");
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple tuple;
+ int i = 0;
+ while ((tuple = scanner.next()) != null) {
+ assertEquals(baseTuple, tuple);
+ i++;
+ }
+ assertEquals(3, i);
+ scanner.close();
+ }
+
+ @Test
+ public void testIgnoreOneErrorTolerance() throws IOException {
+
+
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
+ FileFragment fragment = getFileFragment("testErrorTolerance1.json");
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertNotNull(scanner.next());
+ assertNotNull(scanner.next());
+ try {
+ scanner.next();
+ } catch (IOException ioe) {
+ System.out.println(ioe);
+ return;
+ } finally {
+ scanner.close();
+ }
+ fail();
+ }
+
+ @Test
+ public void testNoErrorTolerance() throws IOException {
+ TajoConf conf = new TajoConf();
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
+ FileFragment fragment = getFileFragment("testErrorTolerance2.json");
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ try {
+ scanner.next();
+ } catch (IOException ioe) {
+ return;
+ } finally {
+ scanner.close();
+ }
+ fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
new file mode 100644
index 0000000..19a39a2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class TestFileStorageManager {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestFileStorageManager";
+ StorageManager sm = null;
+ private Path testDir;
+ private FileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new TajoConf();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testGetScannerAndAppender() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for(int i=0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i].put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+ fs.mkdirs(path.getParent());
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path);
+ appender.init();
+ for(Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+
+ Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path);
+ scanner.init();
+ int i=0;
+ while(scanner.next() != null) {
+ i++;
+ }
+ assertEquals(4,i);
+ }
+
+ @Test
+ public void testGetSplit() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplit");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test partitions
+ List<Path> partitions = Lists.newArrayList();
+ for (int i =0; i < testCount; i++){
+ Path tmpFile = new Path(tablePath, String.valueOf(i));
+ DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
+ partitions.add(tmpFile);
+ }
+
+ assertTrue(fs.exists(tablePath));
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ List<Fragment> splits = Lists.newArrayList();
+ // Get FileFragments in partition batch
+ splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ assertEquals(testCount, splits.size());
+ // -1 is unknown volumeId
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+
+ splits.clear();
+ splits.addAll(sm.getSplits("data", meta, schema,
+ partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
+ assertEquals(testCount / 2, splits.size());
+ assertEquals(1, splits.get(0).getHosts().length);
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown();
+
+ File dir = new File(testDataPath);
+ dir.delete();
+ }
+ }
+
+ @Test
+ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test files
+ for (int i = 0; i < testCount; i++) {
+ Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
+ DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
+ }
+ assertTrue(fs.exists(tablePath));
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ List<Fragment> splits = Lists.newArrayList();
+ splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+
+ assertEquals(testCount, splits.size());
+ assertEquals(2, splits.get(0).getHosts().length);
+ assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
+ assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown();
+
+ File dir = new File(testDataPath);
+ dir.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
new file mode 100644
index 0000000..ff7fe13
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestFileSystems {
+
+ private static String TEST_PATH = "target/test-data/TestFileSystem";
+ private TajoConf conf;
+ private FileStorageManager sm;
+ private FileSystem fs;
+ private Path testDir;
+
+ public TestFileSystems(FileSystem fs) throws IOException {
+ this.fs = fs;
+ this.conf = new TajoConf(fs.getConf());
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ testDir = getTestDir(this.fs, TEST_PATH);
+ }
+
+ public Path getTestDir(FileSystem fs, String dir) throws IOException {
+ Path path = new Path(dir);
+ if(fs.exists(path))
+ fs.delete(path, true);
+
+ fs.mkdirs(path);
+
+ return fs.makeQualified(path);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() throws IOException {
+ return Arrays.asList(new Object[][]{
+ {FileSystem.getLocal(new TajoConf())},
+ });
+ }
+
+ @Before
+ public void setup() throws IOException {
+ if (!(fs instanceof LocalFileSystem)) {
+ conf.set("fs.local.block.size", "10");
+ fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
+ fs.setConf(conf);
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!(fs instanceof LocalFileSystem)) {
+ fs.setConf(new TajoConf());
+ }
+ }
+
+ @Test
+ public void testBlockSplit() throws IOException {
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for (int i = 0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i]
+ .put(new Datum[]{DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
+ "table.csv");
+ fs.mkdirs(path.getParent());
+
+ Appender appender = sm.getAppender(meta, schema, path);
+ appender.init();
+ for (Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+ FileStatus fileStatus = fs.getFileStatus(path);
+
+ List<Fragment> splits = sm.getSplits("table", meta, schema, path);
+ int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
+ assertEquals(splitSize, splits.size());
+
+ for (Fragment fragment : splits) {
+ assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
+ }
+ }
+}
\ No newline at end of file