You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:18 UTC
[03/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
new file mode 100644
index 0000000..9c167a0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.api.ReadSupport.ReadContext;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.GlobalMetaData;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Read records from a Parquet file.
+ */
+public class ParquetReader<T> implements Closeable {
+
+ private ReadSupport<T> readSupport;
+ private UnboundRecordFilter filter;
+ private Configuration conf;
+ private ReadContext readContext;
+ private Iterator<Footer> footersIterator;
+ private InternalParquetRecordReader<T> reader;
+ private GlobalMetaData globalMetaData;
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
+ this(file, readSupport, null);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
+ this(conf, file, readSupport, null);
+ }
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ this(new Configuration(), file, readSupport, filter);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ this.readSupport = readSupport;
+ this.filter = filter;
+ this.conf = conf;
+
+ FileSystem fs = file.getFileSystem(conf);
+ List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
+ List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+ this.footersIterator = footers.iterator();
+ globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ blocks.addAll(footer.getParquetMetadata().getBlocks());
+ }
+
+ MessageType schema = globalMetaData.getSchema();
+ Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
+ readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
+ }
+
+ /**
+ * @return the next record or null if finished
+ * @throws java.io.IOException
+ */
+ public T read() throws IOException {
+ try {
+ if (reader != null && reader.nextKeyValue()) {
+ return reader.getCurrentValue();
+ } else {
+ initReader();
+ return reader == null ? null : read();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void initReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ if (footersIterator.hasNext()) {
+ Footer footer = footersIterator.next();
+ reader = new InternalParquetRecordReader<T>(readSupport, filter);
+ reader.initialize(
+ readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+ readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
new file mode 100644
index 0000000..7527437
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ParquetProperties;
+import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ParquetWriter<T> implements Closeable {
+
+ public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+ public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+ public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
+ CompressionCodecName.UNCOMPRESSED;
+ public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+ public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+ public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
+ ParquetProperties.WriterVersion.PARQUET_1_0;
+
+ private final InternalParquetRecordWriter<T> writer;
+
+ /**
+ * Create a new ParquetWriter.
+ * (with dictionary encoding enabled and validation off)
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean)
+ */
+ public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold (both data and dictionary)
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ dictionaryPageSize, enableDictionary, validating,
+ DEFAULT_WRITER_VERSION);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
+ * configuration from the classpath.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
+ * @param conf Hadoop configuration to use while accessing the filesystem
+ * @throws java.io.IOException
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion,
+ Configuration conf) throws IOException {
+
+ WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+ MessageType schema = writeContext.getSchema();
+
+ ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
+ fileWriter.start();
+
+ CodecFactory codecFactory = new CodecFactory(conf);
+ CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
+ this.writer = new InternalParquetRecordWriter<T>(
+ fileWriter,
+ writeSupport,
+ schema,
+ writeContext.getExtraMetaData(),
+ blockSize,
+ pageSize,
+ compressor,
+ dictionaryPageSize,
+ enableDictionary,
+ validating,
+ writerVersion);
+ }
+
+ /**
+ * Create a new ParquetWriter. The default block size is 50 MB.The default
+ * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a RecordConsumer
+ * @throws java.io.IOException
+ */
+ public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
+ this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ }
+
+ public void write(T object) throws IOException {
+ try {
+ writer.write(object);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public long getEstimatedWrittenSize() throws IOException {
+ return this.writer.getEstimatedWrittenSize();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ writer.close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000..ce9aab6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message FileFragmentProto {
+ required string id = 1;
+ required string path = 2;
+ required int64 startOffset = 3;
+ required int64 length = 4;
+ repeated string hosts = 5;
+ repeated int32 diskIds = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
new file mode 100644
index 0000000..cf8a54e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpFileServer {
+ private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
+
+ private final InetSocketAddress addr;
+ private InetSocketAddress bindAddr;
+ private ServerBootstrap bootstrap = null;
+ private ChannelFactory factory = null;
+ private ChannelGroup channelGroup = null;
+
+ public HttpFileServer(final InetSocketAddress addr) {
+ this.addr = addr;
+ this.factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+ 2);
+
+ // Configure the server.
+ this.bootstrap = new ServerBootstrap(factory);
+ // Set up the event pipeline factory.
+ this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
+ this.channelGroup = new DefaultChannelGroup();
+ }
+
+ public HttpFileServer(String bindaddr) {
+ this(NetUtils.createSocketAddr(bindaddr));
+ }
+
+ public void start() {
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(addr);
+ channelGroup.add(channel);
+ this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+ LOG.info("HttpFileServer starts up ("
+ + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+ + ")");
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddr;
+ }
+
+ public void stop() {
+ ChannelGroupFuture future = channelGroup.close();
+ future.awaitUninterruptibly();
+ factory.releaseExternalResources();
+
+ LOG.info("HttpFileServer shutdown ("
+ + this.bindAddr.getAddress().getHostAddress() + ":"
+ + this.bindAddr.getPort() + ")");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
new file mode 100644
index 0000000..6c77317
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
+ */
+public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ final String path = sanitizeUri(request.getUri());
+ if (path == null) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ File file = new File(path);
+ if (file.isHidden() || !file.exists()) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ if (!file.isFile()) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException fnfe) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ long fileLength = raf.length();
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ setContentLength(response, fileLength);
+ setContentTypeHeader(response);
+
+ Channel ch = e.getChannel();
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ // Write the content.
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) != null) {
+ // Cannot use zero-copy with HTTPS.
+ writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+ } else {
+ // No encryption - use zero-copy.
+ final FileRegion region =
+ new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+ writeFuture = ch.write(region);
+ writeFuture.addListener(new ChannelFutureProgressListener() {
+ public void operationComplete(ChannelFuture future) {
+ region.releaseExternalResources();
+ }
+
+ public void operationProgressed(
+ ChannelFuture future, long amount, long current, long total) {
+ System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+ }
+ });
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ cause.printStackTrace();
+ if (ch.isConnected()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static String sanitizeUri(String uri) {
+ // Decode the path.
+ try {
+ uri = URLDecoder.decode(uri, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ try {
+ uri = URLDecoder.decode(uri, "ISO-8859-1");
+ } catch (UnsupportedEncodingException e1) {
+ throw new Error();
+ }
+ }
+
+ // Convert file separators.
+ uri = uri.replace('/', File.separatorChar);
+
+ // Simplistic dumb security check.
+ // You will have to do something serious in the production environment.
+ if (uri.contains(File.separator + '.') ||
+ uri.contains('.' + File.separator) ||
+ uri.startsWith(".") || uri.endsWith(".")) {
+ return null;
+ }
+
+ return uri;
+ }
+
+ private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(ChannelBuffers.copiedBuffer(
+ "Failure: " + status.toString() + "\r\n",
+ CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Sets the content type header for the HTTP Response
+ *
+ * @param response
+ * HTTP response
+ */
+ private static void setContentTypeHeader(HttpResponse response) {
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
new file mode 100644
index 0000000..cecf93b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+// Uncomment the following lines if you want HTTPS
+//import javax.net.ssl.SSLEngine;
+//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
+//import org.jboss.netty.handler.ssl.SslHandler;
+
+//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
+public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ // Uncomment the following lines if you want HTTPS
+ //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ //engine.setUseClientMode(false);
+ //pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+ pipeline.addLast("handler", new HttpFileServerHandler());
+ return pipeline;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
new file mode 100644
index 0000000..3c78d6b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestCompressionStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestCompressionStorages";
+
+ private StoreType storeType;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestCompressionStorages(StoreType type) throws IOException {
+ this.storeType = type;
+ conf = new TajoConf();
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {StoreType.CSV},
+ {StoreType.RCFILE},
+ {StoreType.SEQUENCEFILE},
+ {StoreType.TEXTFILE}
+ });
+ }
+
+ @Test
+ public void testDeflateCodecCompressionData() throws IOException {
+ storageCompressionTest(storeType, DeflateCodec.class);
+ }
+
+ @Test
+ public void testGzipCodecCompressionData() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ if( ZlibFactory.isNativeZlibLoaded(conf)) {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ } else if (storeType == StoreType.SEQUENCEFILE) {
+ if( ZlibFactory.isNativeZlibLoaded(conf)) {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ } else {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ }
+
+ @Test
+ public void testSnappyCodecCompressionData() throws IOException {
+ if (SnappyCodec.isNativeCodeLoaded()) {
+ storageCompressionTest(storeType, SnappyCodec.class);
+ }
+ }
+
+ @Test
+ public void testLz4CodecCompressionData() throws IOException {
+ if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+ storageCompressionTest(storeType, Lz4Codec.class);
+ }
+
+ private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.FLOAT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.putOption("compression.codec", codec.getCanonicalName());
+ meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
+ meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
+ meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
+
+ String fileName = "Compression_" + codec.getSimpleName();
+ Path tablePath = new Path(testDir, fileName);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ appender.enableStats();
+
+ appender.init();
+
+ String extension = "";
+ if (appender instanceof CSVFile.CSVAppender) {
+ extension = ((CSVFile.CSVAppender) appender).getExtension();
+ } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
+ extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
+ }
+
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createFloat4((float) i));
+ vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ tablePath = tablePath.suffix(extension);
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment[] tablets = new FileFragment[1];
+ tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
+
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+
+ if (StoreType.CSV == storeType) {
+ if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
+ assertTrue(scanner.isSplittable());
+ } else {
+ assertFalse(scanner.isSplittable());
+ }
+ }
+ scanner.init();
+
+ if (storeType == StoreType.SEQUENCEFILE) {
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+ }
+
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+ assertEquals(tupleNum, tupleCnt);
+ assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
new file mode 100644
index 0000000..19a39a2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class TestFileStorageManager {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestFileStorageManager";
+ StorageManager sm = null;
+ private Path testDir;
+ private FileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new TajoConf();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testGetScannerAndAppender() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for(int i=0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i].put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+ fs.mkdirs(path.getParent());
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path);
+ appender.init();
+ for(Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+
+ Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path);
+ scanner.init();
+ int i=0;
+ while(scanner.next() != null) {
+ i++;
+ }
+ assertEquals(4,i);
+ }
+
+ @Test
+ public void testGetSplit() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplit");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test partitions
+ List<Path> partitions = Lists.newArrayList();
+ for (int i =0; i < testCount; i++){
+ Path tmpFile = new Path(tablePath, String.valueOf(i));
+ DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
+ partitions.add(tmpFile);
+ }
+
+ assertTrue(fs.exists(tablePath));
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ List<Fragment> splits = Lists.newArrayList();
+ // Get FileFragments in partition batch
+ splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ assertEquals(testCount, splits.size());
+ // -1 is unknown volumeId
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+
+ splits.clear();
+ splits.addAll(sm.getSplits("data", meta, schema,
+ partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
+ assertEquals(testCount / 2, splits.size());
+ assertEquals(1, splits.get(0).getHosts().length);
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown();
+
+ File dir = new File(testDataPath);
+ dir.delete();
+ }
+ }
+
+ @Test
+ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test files
+ for (int i = 0; i < testCount; i++) {
+ Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
+ DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
+ }
+ assertTrue(fs.exists(tablePath));
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ List<Fragment> splits = Lists.newArrayList();
+ splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+
+ assertEquals(testCount, splits.size());
+ assertEquals(2, splits.get(0).getHosts().length);
+ assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
+ assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown();
+
+ File dir = new File(testDataPath);
+ dir.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
new file mode 100644
index 0000000..088fda9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
+import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestFileSystems {
+
+ protected byte[] data = null;
+
+ private static String TEST_PATH = "target/test-data/TestFileSystem";
+ private TajoConf conf = null;
+ private FileStorageManager sm = null;
+ private FileSystem fs = null;
+ Path testDir;
+
+ public TestFileSystems(FileSystem fs) throws IOException {
+ conf = new TajoConf();
+
+ if(fs instanceof S3FileSystem){
+ conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
+ fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
+ }
+ this.fs = fs;
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ testDir = getTestDir(this.fs, TEST_PATH);
+ }
+
+ public Path getTestDir(FileSystem fs, String dir) throws IOException {
+ Path path = new Path(dir);
+ if(fs.exists(path))
+ fs.delete(path, true);
+
+ fs.mkdirs(path);
+
+ return fs.makeQualified(path);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
+ });
+ }
+
+ @Test
+ public void testBlockSplit() throws IOException {
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for (int i = 0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i]
+ .put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i) });
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
+ "table.csv");
+ fs.mkdirs(path.getParent());
+
+ Appender appender = sm.getAppender(meta, schema, path);
+ appender.init();
+ for (Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+ FileStatus fileStatus = fs.getFileStatus(path);
+
+ List<Fragment> splits = sm.getSplits("table", meta, schema, path);
+ int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
+ assertEquals(splitSize, splits.size());
+
+ for (Fragment fragment : splits) {
+ assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
new file mode 100644
index 0000000..a0daa7d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestMergeScanner {
+ private TajoConf conf;
+ StorageManager sm;
+ private static String TEST_PATH = "target/test-data/TestMergeScanner";
+
+ private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testMultipleFiles\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"id\", \"type\": \"int\" },\n" +
+ " { \"name\": \"file\", \"type\": \"string\" },\n" +
+ " { \"name\": \"name\", \"type\": \"string\" },\n" +
+ " { \"name\": \"age\", \"type\": \"long\" }\n" +
+ " ]\n" +
+ "}\n";
+
+ private Path testDir;
+ private StoreType storeType;
+ private FileSystem fs;
+
+ public TestMergeScanner(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {StoreType.CSV},
+ {StoreType.RAW},
+ {StoreType.RCFILE},
+ {StoreType.PARQUET},
+ {StoreType.SEQUENCEFILE},
+ {StoreType.AVRO},
+ // RowFile requires Byte-buffer read support, so we omitted RowFile.
+ //{StoreType.ROWFILE},
+ });
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new TajoConf();
+ conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+ conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
+ }
+
+ @Test
+ public void testMultipleFiles() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("file", Type.TEXT);
+ schema.addColumn("name", Type.TEXT);
+ schema.addColumn("age", Type.INT8);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_MULTIPLE_FILES_AVRO_SCHEMA);
+ }
+
+ Path table1Path = new Path(testDir, storeType + "_1.data");
+ Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+ appender1.enableStats();
+ appender1.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender1.addTuple(vTuple);
+ }
+ appender1.close();
+
+ TableStats stat1 = appender1.getStats();
+ if (stat1 != null) {
+ assertEquals(tupleNum, stat1.getNumRows().longValue());
+ }
+
+ Path table2Path = new Path(testDir, storeType + "_2.data");
+ Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+ appender2.enableStats();
+ appender2.init();
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender2.addTuple(vTuple);
+ }
+ appender2.close();
+
+ TableStats stat2 = appender2.getStats();
+ if (stat2 != null) {
+ assertEquals(tupleNum, stat2.getNumRows().longValue());
+ }
+
+
+ FileStatus status1 = fs.getFileStatus(table1Path);
+ FileStatus status2 = fs.getFileStatus(table2Path);
+ Fragment[] fragment = new Fragment[2];
+ fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
+ fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
+
+ Schema targetSchema = new Schema();
+ targetSchema.addColumn(schema.getColumn(0));
+ targetSchema.addColumn(schema.getColumn(2));
+
+ Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
+ assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
+
+ scanner.init();
+ int totalCounts = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ totalCounts++;
+ if (isProjectableStorage(meta.getStoreType())) {
+ assertNotNull(tuple.get(0));
+ assertNull(tuple.get(1));
+ assertNotNull(tuple.get(2));
+ assertNull(tuple.get(3));
+ }
+ }
+ scanner.close();
+
+ assertEquals(tupleNum * 2, totalCounts);
+ }
+
+ private static boolean isProjectableStorage(StoreType type) {
+ switch (type) {
+ case RCFILE:
+ case PARQUET:
+ case SEQUENCEFILE:
+ case CSV:
+ case AVRO:
+ return true;
+ default:
+ return false;
+ }
+ }
+}