You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:55 UTC
[18/18] git commit: TAJO-520: Move tajo-core-storage to tajo-storage.
(jinho)
TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bbf9b7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bbf9b7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bbf9b7bf
Branch: refs/heads/master
Commit: bbf9b7bf8b60a930b4e44754c732f9629762d1bf
Parents: 6aa96fa
Author: jinossy <ji...@gmail.com>
Authored: Tue Jan 28 21:34:41 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Tue Jan 28 21:34:41 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
pom.xml | 1 +
tajo-client/pom.xml | 2 +-
tajo-core/pom.xml | 2 -
tajo-core/tajo-core-backend/pom.xml | 2 +-
tajo-core/tajo-core-pullserver/pom.xml | 2 +-
tajo-core/tajo-core-storage/pom.xml | 301 ---
.../tajo/storage/AbstractStorageManager.java | 690 -------
.../java/org/apache/tajo/storage/Appender.java | 39 -
.../storage/BinarySerializerDeserializer.java | 257 ---
.../java/org/apache/tajo/storage/CSVFile.java | 531 -----
.../tajo/storage/CompressedSplitLineReader.java | 182 --
.../org/apache/tajo/storage/DataLocation.java | 45 -
.../org/apache/tajo/storage/FileAppender.java | 61 -
.../org/apache/tajo/storage/FileScanner.java | 93 -
.../org/apache/tajo/storage/FrameTuple.java | 231 ---
.../java/org/apache/tajo/storage/LazyTuple.java | 291 ---
.../org/apache/tajo/storage/LineReader.java | 559 ------
.../org/apache/tajo/storage/MergeScanner.java | 154 --
.../tajo/storage/NumericPathComparator.java | 34 -
.../java/org/apache/tajo/storage/RawFile.java | 532 -----
.../java/org/apache/tajo/storage/RowFile.java | 506 -----
.../org/apache/tajo/storage/RowStoreUtil.java | 206 --
.../java/org/apache/tajo/storage/Scanner.java | 94 -
.../apache/tajo/storage/SeekableScanner.java | 28 -
.../tajo/storage/SerializerDeserializer.java | 34 -
.../apache/tajo/storage/SplitLineReader.java | 39 -
.../java/org/apache/tajo/storage/Storage.java | 45 -
.../org/apache/tajo/storage/StorageManager.java | 67 -
.../tajo/storage/StorageManagerFactory.java | 98 -
.../org/apache/tajo/storage/StorageUtil.java | 83 -
.../apache/tajo/storage/TableStatistics.java | 117 --
.../storage/TextSerializerDeserializer.java | 209 --
.../java/org/apache/tajo/storage/Tuple.java | 82 -
.../apache/tajo/storage/TupleComparator.java | 159 --
.../org/apache/tajo/storage/TupleRange.java | 103 -
.../java/org/apache/tajo/storage/VTuple.java | 226 ---
.../storage/annotation/ForSplitableStore.java | 29 -
.../apache/tajo/storage/compress/CodecPool.java | 185 --
.../AlreadyExistsStorageException.java | 39 -
.../exception/UnknownCodecException.java | 32 -
.../exception/UnknownDataTypeException.java | 32 -
.../exception/UnsupportedFileTypeException.java | 36 -
.../tajo/storage/fragment/FileFragment.java | 219 ---
.../apache/tajo/storage/fragment/Fragment.java | 31 -
.../storage/fragment/FragmentConvertor.java | 123 --
.../apache/tajo/storage/index/IndexMethod.java | 32 -
.../apache/tajo/storage/index/IndexReader.java | 35 -
.../apache/tajo/storage/index/IndexWriter.java | 33 -
.../tajo/storage/index/OrderIndexReader.java | 45 -
.../apache/tajo/storage/index/bst/BSTIndex.java | 623 ------
.../storage/rcfile/BytesRefArrayWritable.java | 261 ---
.../tajo/storage/rcfile/BytesRefWritable.java | 248 ---
.../storage/rcfile/ColumnProjectionUtils.java | 117 --
.../rcfile/LazyDecompressionCallback.java | 32 -
.../rcfile/NonSyncByteArrayInputStream.java | 113 --
.../rcfile/NonSyncByteArrayOutputStream.java | 144 --
.../storage/rcfile/NonSyncDataInputBuffer.java | 507 -----
.../storage/rcfile/NonSyncDataOutputBuffer.java | 91 -
.../org/apache/tajo/storage/rcfile/RCFile.java | 1739 -----------------
.../SchemaAwareCompressionInputStream.java | 43 -
.../SchemaAwareCompressionOutputStream.java | 44 -
.../tajo/storage/trevni/TrevniAppender.java | 201 --
.../tajo/storage/trevni/TrevniScanner.java | 193 --
.../apache/tajo/storage/v2/CSVFileScanner.java | 386 ----
.../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 -
.../tajo/storage/v2/DiskFileScanScheduler.java | 205 --
.../org/apache/tajo/storage/v2/DiskInfo.java | 75 -
.../apache/tajo/storage/v2/DiskMountInfo.java | 101 -
.../org/apache/tajo/storage/v2/DiskUtil.java | 199 --
.../apache/tajo/storage/v2/FileScanRunner.java | 70 -
.../apache/tajo/storage/v2/FileScannerV2.java | 203 --
.../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ------------------
.../apache/tajo/storage/v2/RCFileScanner.java | 297 ---
.../apache/tajo/storage/v2/ScanScheduler.java | 189 --
.../tajo/storage/v2/ScheduledInputStream.java | 513 -----
.../tajo/storage/v2/StorageManagerV2.java | 140 --
.../src/main/proto/IndexProtos.proto | 29 -
.../src/main/resources/storage-default.xml | 149 --
.../tajo/storage/TestCompressionStorages.java | 233 ---
.../org/apache/tajo/storage/TestFrameTuple.java | 84 -
.../org/apache/tajo/storage/TestLazyTuple.java | 258 ---
.../apache/tajo/storage/TestMergeScanner.java | 179 --
.../apache/tajo/storage/TestStorageManager.java | 93 -
.../org/apache/tajo/storage/TestStorages.java | 375 ----
.../tajo/storage/TestTupleComparator.java | 77 -
.../org/apache/tajo/storage/TestVTuple.java | 160 --
.../apache/tajo/storage/index/TestBSTIndex.java | 948 ---------
.../index/TestSingleCSVFileBSTIndex.java | 248 ---
.../tajo/storage/v2/TestCSVCompression.java | 213 --
.../apache/tajo/storage/v2/TestCSVScanner.java | 168 --
.../apache/tajo/storage/v2/TestStorages.java | 242 ---
.../src/test/resources/storage-default.xml | 149 --
tajo-dist/pom.xml | 6 +
tajo-jdbc/pom.xml | 2 +-
tajo-project/pom.xml | 2 +-
tajo-storage/pom.xml | 383 ++++
.../tajo/storage/AbstractStorageManager.java | 690 +++++++
.../java/org/apache/tajo/storage/Appender.java | 39 +
.../storage/BinarySerializerDeserializer.java | 257 +++
.../java/org/apache/tajo/storage/CSVFile.java | 531 +++++
.../tajo/storage/CompressedSplitLineReader.java | 182 ++
.../org/apache/tajo/storage/DataLocation.java | 45 +
.../org/apache/tajo/storage/FileAppender.java | 61 +
.../org/apache/tajo/storage/FileScanner.java | 93 +
.../org/apache/tajo/storage/FrameTuple.java | 231 +++
.../java/org/apache/tajo/storage/LazyTuple.java | 291 +++
.../org/apache/tajo/storage/LineReader.java | 559 ++++++
.../org/apache/tajo/storage/MergeScanner.java | 154 ++
.../tajo/storage/NumericPathComparator.java | 34 +
.../java/org/apache/tajo/storage/RawFile.java | 532 +++++
.../java/org/apache/tajo/storage/RowFile.java | 506 +++++
.../org/apache/tajo/storage/RowStoreUtil.java | 206 ++
.../java/org/apache/tajo/storage/Scanner.java | 94 +
.../apache/tajo/storage/SeekableScanner.java | 28 +
.../tajo/storage/SerializerDeserializer.java | 34 +
.../apache/tajo/storage/SplitLineReader.java | 39 +
.../java/org/apache/tajo/storage/Storage.java | 45 +
.../org/apache/tajo/storage/StorageManager.java | 67 +
.../tajo/storage/StorageManagerFactory.java | 98 +
.../org/apache/tajo/storage/StorageUtil.java | 83 +
.../apache/tajo/storage/TableStatistics.java | 117 ++
.../storage/TextSerializerDeserializer.java | 209 ++
.../java/org/apache/tajo/storage/Tuple.java | 82 +
.../apache/tajo/storage/TupleComparator.java | 159 ++
.../org/apache/tajo/storage/TupleRange.java | 103 +
.../java/org/apache/tajo/storage/VTuple.java | 226 +++
.../storage/annotation/ForSplitableStore.java | 29 +
.../apache/tajo/storage/compress/CodecPool.java | 185 ++
.../AlreadyExistsStorageException.java | 39 +
.../exception/UnknownCodecException.java | 32 +
.../exception/UnknownDataTypeException.java | 32 +
.../exception/UnsupportedFileTypeException.java | 36 +
.../tajo/storage/fragment/FileFragment.java | 219 +++
.../apache/tajo/storage/fragment/Fragment.java | 31 +
.../storage/fragment/FragmentConvertor.java | 123 ++
.../apache/tajo/storage/index/IndexMethod.java | 32 +
.../apache/tajo/storage/index/IndexReader.java | 35 +
.../apache/tajo/storage/index/IndexWriter.java | 33 +
.../tajo/storage/index/OrderIndexReader.java | 45 +
.../apache/tajo/storage/index/bst/BSTIndex.java | 623 ++++++
.../storage/rcfile/BytesRefArrayWritable.java | 261 +++
.../tajo/storage/rcfile/BytesRefWritable.java | 248 +++
.../storage/rcfile/ColumnProjectionUtils.java | 117 ++
.../rcfile/LazyDecompressionCallback.java | 32 +
.../rcfile/NonSyncByteArrayInputStream.java | 113 ++
.../rcfile/NonSyncByteArrayOutputStream.java | 144 ++
.../storage/rcfile/NonSyncDataInputBuffer.java | 507 +++++
.../storage/rcfile/NonSyncDataOutputBuffer.java | 91 +
.../org/apache/tajo/storage/rcfile/RCFile.java | 1739 +++++++++++++++++
.../SchemaAwareCompressionInputStream.java | 43 +
.../SchemaAwareCompressionOutputStream.java | 44 +
.../tajo/storage/trevni/TrevniAppender.java | 201 ++
.../tajo/storage/trevni/TrevniScanner.java | 193 ++
.../apache/tajo/storage/v2/CSVFileScanner.java | 386 ++++
.../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 +
.../tajo/storage/v2/DiskFileScanScheduler.java | 205 ++
.../org/apache/tajo/storage/v2/DiskInfo.java | 75 +
.../apache/tajo/storage/v2/DiskMountInfo.java | 101 +
.../org/apache/tajo/storage/v2/DiskUtil.java | 199 ++
.../apache/tajo/storage/v2/FileScanRunner.java | 70 +
.../apache/tajo/storage/v2/FileScannerV2.java | 203 ++
.../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ++++++++++++++++++
.../apache/tajo/storage/v2/RCFileScanner.java | 297 +++
.../apache/tajo/storage/v2/ScanScheduler.java | 189 ++
.../tajo/storage/v2/ScheduledInputStream.java | 513 +++++
.../tajo/storage/v2/StorageManagerV2.java | 140 ++
tajo-storage/src/main/proto/IndexProtos.proto | 29 +
.../src/main/resources/storage-default.xml | 149 ++
.../tajo/storage/TestCompressionStorages.java | 233 +++
.../org/apache/tajo/storage/TestFrameTuple.java | 84 +
.../org/apache/tajo/storage/TestLazyTuple.java | 258 +++
.../apache/tajo/storage/TestMergeScanner.java | 179 ++
.../apache/tajo/storage/TestStorageManager.java | 93 +
.../org/apache/tajo/storage/TestStorages.java | 375 ++++
.../tajo/storage/TestTupleComparator.java | 77 +
.../org/apache/tajo/storage/TestVTuple.java | 160 ++
.../apache/tajo/storage/index/TestBSTIndex.java | 948 +++++++++
.../index/TestSingleCSVFileBSTIndex.java | 248 +++
.../tajo/storage/v2/TestCSVCompression.java | 213 ++
.../apache/tajo/storage/v2/TestCSVScanner.java | 168 ++
.../apache/tajo/storage/v2/TestStorages.java | 242 +++
.../src/test/resources/storage-default.xml | 149 ++
183 files changed, 19287 insertions(+), 19198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3f2bff..9699782 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -386,6 +386,8 @@ Release 0.8.0 - unreleased
TASKS
+ TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
+
TAJO-536: Fix warnings in tajo-core-storage. (jinho)
TAJO-545: MySQLStore Documentation. (jaehwa)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d70f0e4..569e9be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
<module>tajo-client</module>
<module>tajo-jdbc</module>
<module>tajo-dist</module>
+ <module>tajo-storage</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-client/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml
index 9b4cd5e..c6fbb27 100644
--- a/tajo-client/pom.xml
+++ b/tajo-client/pom.xml
@@ -201,7 +201,7 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-core-storage</artifactId>
+ <artifactId>tajo-storage</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 09978c3..e131d0e 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -34,7 +34,6 @@
<modules>
<module>tajo-core-backend</module>
- <module>tajo-core-storage</module>
<module>tajo-core-pullserver</module>
</modules>
@@ -161,7 +160,6 @@
run rm -rf ${project.artifactId}-${project.version}
run mkdir ${project.artifactId}-${project.version}
run cd ${project.artifactId}-${project.version}
- run cp -r ${basedir}/${project.artifactId}-storage/target/${project.artifactId}-storage-${project.version}*.jar .
run cp -r ${basedir}/${project.artifactId}-pullserver/target/${project.artifactId}-pullserver-${project.version}*.jar .
run cp -r ${basedir}/${project.artifactId}-backend/target/${project.artifactId}-backend-${project.version}*.jar .
run cp -r ${basedir}/${project.artifactId}-backend/target/lib .
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index abc217e..fce9925 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -213,7 +213,7 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-core-storage</artifactId>
+ <artifactId>tajo-storage</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/pom.xml b/tajo-core/tajo-core-pullserver/pom.xml
index 8c6d4fe..0bdfed2 100644
--- a/tajo-core/tajo-core-pullserver/pom.xml
+++ b/tajo-core/tajo-core-pullserver/pom.xml
@@ -44,7 +44,7 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-core-storage</artifactId>
+ <artifactId>tajo-storage</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/pom.xml b/tajo-core/tajo-core-storage/pom.xml
deleted file mode 100644
index dcbde44..0000000
--- a/tajo-core/tajo-core-storage/pom.xml
+++ /dev/null
@@ -1,301 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright 2012 Database Lab., Korea Univ.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>tajo-project</artifactId>
- <groupId>org.apache.tajo</groupId>
- <version>0.8.0-SNAPSHOT</version>
- <relativePath>../../tajo-project</relativePath>
- </parent>
- <artifactId>tajo-core-storage</artifactId>
- <packaging>jar</packaging>
- <name>Tajo Core Storage</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- </properties>
-
- <repositories>
- <repository>
- <id>repository.jboss.org</id>
- <url>https://repository.jboss.org/nexus/content/repositories/releases/
- </url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <id>create-protobuf-generated-sources-directory</id>
- <phase>initialize</phase>
- <configuration>
- <target>
- <mkdir dir="target/generated-sources/proto" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.2</version>
- <executions>
- <execution>
- <id>generate-sources</id>
- <phase>generate-sources</phase>
- <configuration>
- <executable>protoc</executable>
- <arguments>
- <argument>-Isrc/main/proto/</argument>
- <argument>--proto_path=../../tajo-common/src/main/proto</argument>
- <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
- <argument>--java_out=target/generated-sources/proto</argument>
- <argument>src/main/proto/IndexProtos.proto</argument>
- </arguments>
- </configuration>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/proto</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <version>2.7.1</version>
- </plugin>
- </plugins>
- </build>
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-catalog-common</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>trevni-core</artifactId>
- <version>1.7.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>trevni-avro</artifactId>
- <version>1.7.3</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey.jersey-test-framework</groupId>
- <artifactId>jersey-test-framework-grizzly2</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging-api</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- </dependencies>
-
- <profiles>
- <profile>
- <id>docs</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <execution>
- <!-- build javadoc jars per jar for publishing to maven -->
- <id>module-javadocs</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <destDir>${project.build.directory}</destDir>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>src</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <executions>
- <execution>
- <!-- builds source jars and attaches them to the project for publishing -->
- <id>tajo-java-sources</id>
- <phase>package</phase>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
-
-</project>
-
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
deleted file mode 100644
index 91a535e..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public abstract class AbstractStorageManager {
- private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
-
- protected final TajoConf conf;
- protected final FileSystem fs;
- protected final Path tableBaseDir;
- protected final boolean blocksMetadataEnabled;
-
- /**
- * Cache of scanner handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
- /**
- * Cache of appender handlers for each storage type.
- */
- protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
- public abstract Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException;
-
- public abstract Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
-
- protected AbstractStorageManager(TajoConf conf) throws IOException {
- this.conf = conf;
- this.tableBaseDir = TajoConf.getWarehouseDir(conf);
- this.fs = tableBaseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled)
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
-
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
- return getScanner(meta, schema, fragment);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
- }
-
- public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
- return getScanner(meta, schema, fragment, schema);
- }
-
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getWarehouseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
- }
-
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
- public Appender getAppender(TableMeta meta, Schema schema, Path path)
- throws IOException {
- Appender appender;
-
- Class<? extends FileAppender> appenderClass;
-
- String handlerName = meta.getStoreType().name().toLowerCase();
- appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
- if (appenderClass == null) {
- appenderClass = conf.getClass(
- String.format("tajo.storage.appender-handler.%s.class",
- meta.getStoreType().name().toLowerCase()), null,
- FileAppender.class);
- APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
- }
-
- if (appenderClass == null) {
- throw new IOException("Unknown Storage Type: " + meta.getStoreType());
- }
-
- appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
-
- return appender;
- }
-
-
- public TableMeta getTableMeta(Path tablePath) throws IOException {
- TableMeta meta;
-
- FileSystem fs = tablePath.getFileSystem(conf);
- Path tableMetaPath = new Path(tablePath, ".meta");
- if (!fs.exists(tableMetaPath)) {
- throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
- }
-
- FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
- CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
- CatalogProtos.TableProto.getDefaultInstance());
- meta = new TableMeta(tableProto);
-
- return meta;
- }
-
- public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
- listTablets.add(tablet);
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public FileFragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private FileFragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- totalSize = fs.getContentSummary(tablePath).getLength();
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- private static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path path) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- Path[] dirs = new Path[]{path};
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Get the lower bound on split size imposed by the format.
- *
- * @return the number of bytes of the minimal split for this format
- */
- protected long getFormatMinSplitSize() {
- return 1;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- *
- * @param filename the file name to check
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException {
- Scanner scanner = getFileScanner(meta, schema, filename);
- return scanner.isSplittable();
- }
-
- @Deprecated
- protected long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
-
- @Deprecated
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- @Deprecated
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
- return new FileFragment(fragmentId, file, start, length);
- }
-
- protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
- int[] diskIds) throws IOException {
- return new FileFragment(fragmentId, file, blockLocation, diskIds);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- }
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- /**
- * Get the maximum split size.
- *
- * @return the maximum number of bytes a split can include
- */
- @Deprecated
- public static long getMaxSplitSize() {
- // TODO - to be configurable
- return 536870912L;
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- @Deprecated
- public static long getMinSplitSize() {
- // TODO - to be configurable
- return 67108864L;
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].isValid()) {
- String volumeIdString = volumeIds[i].toString();
- byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
-
- if (volumeIdBytes.length == 4) {
- diskId = Bytes.toInt(volumeIdBytes);
- } else if (volumeIdBytes.length == 1) {
- diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
- }
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (FileFragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
- // generate splits'
-
- List<FileFragment> splits = new ArrayList<FileFragment>();
- FileSystem fs = inputPath.getFileSystem(conf);
- List<FileStatus> files;
- if (fs.isFile(inputPath)) {
- files = Lists.newArrayList(fs.getFileStatus(inputPath));
- } else {
- files = listStatus(inputPath);
- }
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, schema, path);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
- // supported disk volume
- BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
- .getFileBlockStorageLocations(Arrays.asList(blkLocations));
- if (splittable) {
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
- .getVolumeIds())));
- }
- } else { // Non splittable
- long blockSize = blockStorageLocations[0].getLength();
- if (blockSize >= length) {
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
- .getVolumeIds())));
- }
- } else {
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
- }
- }
-
- } else {
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- splits.add(makeSplit(tableName, meta, path, blockLocation, null));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, meta, path, 0, length));
- }
- }
-
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private static class InvalidInputException extends IOException {
- List<IOException> errors;
- public InvalidInputException(List<IOException> errors) {
- this.errors = errors;
- }
-
- @Override
- public String getMessage(){
- StringBuffer sb = new StringBuffer();
- int messageLimit = Math.min(errors.size(), 10);
- for (int i = 0; i < messageLimit ; i ++) {
- sb.append(errors.get(i).getMessage()).append("\n");
- }
-
- if(messageLimit < errors.size())
- sb.append("skipped .....").append("\n");
-
- return sb.toString();
- }
- }
-
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- FileFragment.class
- };
-
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- Path.class
- };
-
- /**
- * create a scanner instance.
- */
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
- Fragment fragment) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
- * create a scanner instance.
- */
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
- Path path) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, schema, meta, path});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
deleted file mode 100644
index ed6ea34..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface Appender extends Closeable {
-
- void init() throws IOException;
-
- void addTuple(Tuple t) throws IOException;
-
- void flush() throws IOException;
-
- void close() throws IOException;
-
- void enableStats();
-
- TableStats getStats();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
deleted file mode 100644
index ed034be..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class BinarySerializerDeserializer implements SerializerDeserializer {
-
- static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
- @Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
- throws IOException {
- byte[] bytes;
- int length = 0;
- if (datum == null || datum instanceof NullDatum) {
- return 0;
- }
-
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- case BIT:
- case CHAR:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case INT2:
- length = writeShort(out, datum.asInt2());
- break;
- case INT4:
- length = writeVLong(out, datum.asInt4());
- break;
- case INT8:
- length = writeVLong(out, datum.asInt8());
- break;
- case FLOAT4:
- length = writeFloat(out, datum.asFloat4());
- break;
- case FLOAT8:
- length = writeDouble(out, datum.asFloat8());
- break;
- case TEXT: {
- bytes = datum.asTextBytes();
- length = datum.size();
- if (length == 0) {
- bytes = INVALID_UTF__SINGLE_BYTE;
- length = INVALID_UTF__SINGLE_BYTE.length;
- }
- out.write(bytes, 0, bytes.length);
- break;
- }
- case BLOB:
- case INET4:
- case INET6:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
- bytes = protobufDatum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case NULL_TYPE:
- break;
- default:
- throw new IOException("Does not support type");
- }
- return length;
- }
-
- @Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
- if (length == 0) return NullDatum.get();
-
- Datum datum;
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- datum = DatumFactory.createBool(bytes[offset]);
- break;
- case BIT:
- datum = DatumFactory.createBit(bytes[offset]);
- break;
- case CHAR: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
- datum = DatumFactory.createChar(chars);
- break;
- }
- case INT2:
- datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
- break;
- case INT4:
- datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
- break;
- case INT8:
- datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
- break;
- case FLOAT4:
- datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
- break;
- case FLOAT8:
- datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
- break;
- case TEXT: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
-
- if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
- datum = DatumFactory.createText(new byte[0]);
- } else {
- datum = DatumFactory.createText(chars);
- }
- break;
- }
- case PROTOBUF: {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(bytes, offset, length);
- datum = factory.createDatum(builder);
- break;
- }
- case INET4:
- datum = DatumFactory.createInet4(bytes, offset, length);
- break;
- case BLOB:
- datum = DatumFactory.createBlob(bytes, offset, length);
- break;
- default:
- datum = NullDatum.get();
- }
- return datum;
- }
-
- private byte[] shortBytes = new byte[2];
-
- public int writeShort(OutputStream out, short val) throws IOException {
- shortBytes[0] = (byte) (val >> 8);
- shortBytes[1] = (byte) val;
- out.write(shortBytes, 0, 2);
- return 2;
- }
-
- public float toFloat(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 4);
-
- int val = ((bytes[offset] & 0x000000FF) << 24) +
- ((bytes[offset + 1] & 0x000000FF) << 16) +
- ((bytes[offset + 2] & 0x000000FF) << 8) +
- (bytes[offset + 3] & 0x000000FF);
- return Float.intBitsToFloat(val);
- }
-
- private byte[] floatBytes = new byte[4];
-
- public int writeFloat(OutputStream out, float f) throws IOException {
- int val = Float.floatToIntBits(f);
-
- floatBytes[0] = (byte) (val >> 24);
- floatBytes[1] = (byte) (val >> 16);
- floatBytes[2] = (byte) (val >> 8);
- floatBytes[3] = (byte) val;
- out.write(floatBytes, 0, 4);
- return floatBytes.length;
- }
-
- public double toDouble(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 8);
- long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
- ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
- ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
- ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
- ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
- ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
- ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
- (long) (bytes[offset + 7] & 0x00000000000000FF);
- return Double.longBitsToDouble(val);
- }
-
- private byte[] doubleBytes = new byte[8];
-
- public int writeDouble(OutputStream out, double d) throws IOException {
- long val = Double.doubleToLongBits(d);
-
- doubleBytes[0] = (byte) (val >> 56);
- doubleBytes[1] = (byte) (val >> 48);
- doubleBytes[2] = (byte) (val >> 40);
- doubleBytes[3] = (byte) (val >> 32);
- doubleBytes[4] = (byte) (val >> 24);
- doubleBytes[5] = (byte) (val >> 16);
- doubleBytes[6] = (byte) (val >> 8);
- doubleBytes[7] = (byte) val;
- out.write(doubleBytes, 0, 8);
- return doubleBytes.length;
- }
-
- private byte[] vLongBytes = new byte[9];
-
- public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
- if (l >= -112 && l <= 127) {
- bytes[offset] = (byte) l;
- return 1;
- }
-
- int len = -112;
- if (l < 0) {
- l ^= -1L; // take one's complement'
- len = -120;
- }
-
- long tmp = l;
- while (tmp != 0) {
- tmp = tmp >> 8;
- len--;
- }
-
- bytes[offset++] = (byte) len;
- len = (len < -120) ? -(len + 120) : -(len + 112);
-
- for (int idx = len; idx != 0; idx--) {
- int shiftbits = (idx - 1) * 8;
- bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
- }
- return 1 + len;
- }
-
- public int writeVLong(OutputStream out, long l) throws IOException {
- int len = writeVLongToByteArray(vLongBytes, 0, l);
- out.write(vLongBytes, 0, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
deleted file mode 100644
index 5d05d6f..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.Bytes;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class CSVFile {
-
- public static final String DELIMITER = "csvfile.delimiter";
- public static final String NULL = "csvfile.null"; //read only
- public static final String SERDE = "csvfile.serde";
- public static final String DELIMITER_DEFAULT = "|";
- public static final byte LF = '\n';
- public static int EOF = -1;
-
- private static final Log LOG = LogFactory.getLog(CSVFile.class);
-
- public static class CSVAppender extends FileAppender {
- private final TableMeta meta;
- private final Schema schema;
- private final int columnNum;
- private final FileSystem fs;
- private FSDataOutputStream fos;
- private DataOutputStream outputStream;
- private CompressionOutputStream deflateFilter;
- private char delimiter;
- private TableStatistics stats = null;
- private Compressor compressor;
- private CompressionCodecFactory codecFactory;
- private CompressionCodec codec;
- private Path compressedPath;
- private byte[] nullChars;
- private int BUFFER_SIZE = 128 * 1024;
- private int bufferedBytes = 0;
- private long pos = 0;
-
- private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
- private SerializerDeserializer serde;
-
- public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
- super(conf, schema, meta, path);
- this.fs = path.getFileSystem(conf);
- this.meta = meta;
- this.schema = schema;
- this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
- this.columnNum = schema.getColumnNum();
- String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- @Override
- public void init() throws IOException {
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
- if(!StringUtils.isEmpty(codecName)){
- codecFactory = new CompressionCodecFactory(conf);
- codec = codecFactory.getCodecByClassName(codecName);
- compressor = CodecPool.getCompressor(codec);
- if(compressor != null) compressor.reset(); //builtin gzip is null
-
- String extension = codec.getDefaultExtension();
- compressedPath = path.suffix(extension);
-
- if (fs.exists(compressedPath)) {
- throw new AlreadyExistsStorageException(compressedPath);
- }
-
- fos = fs.create(compressedPath);
- deflateFilter = codec.createOutputStream(fos, compressor);
- outputStream = new DataOutputStream(deflateFilter);
-
- } else {
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
- fos = fs.create(path);
- outputStream = new DataOutputStream(new BufferedOutputStream(fos));
- }
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- try {
- String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- os.reset();
- pos = fos.getPos();
- bufferedBytes = 0;
- super.init();
- }
-
-
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
- int rowBytes = 0;
-
- for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
- if(columnNum - 1 > i){
- os.write((byte) delimiter);
- rowBytes += 1;
- }
- if (enabledStats) {
- stats.analyzeField(i, datum);
- }
- }
- os.write(LF);
- rowBytes += 1;
-
- pos += rowBytes;
- bufferedBytes += rowBytes;
- if(bufferedBytes > BUFFER_SIZE){
- flushBuffer();
- }
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- private void flushBuffer() throws IOException {
- if(os.getLength() > 0) {
- os.writeTo(outputStream);
- os.reset();
- bufferedBytes = 0;
- }
- }
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- @Override
- public void flush() throws IOException {
- flushBuffer();
- outputStream.flush();
- }
-
- @Override
- public void close() throws IOException {
-
- try {
- flush();
-
- // Statistical section
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
-
- if(deflateFilter != null) {
- deflateFilter.finish();
- deflateFilter.resetState();
- deflateFilter = null;
- }
-
- os.close();
- } finally {
- IOUtils.cleanup(LOG, fos);
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-
- public boolean isCompress() {
- return compressor != null;
- }
-
- public String getExtension() {
- return codec != null ? codec.getDefaultExtension() : "";
- }
- }
-
- public static class CSVScanner extends FileScanner implements SeekableScanner {
- public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
- factory = new CompressionCodecFactory(conf);
- codec = factory.getCodec(fragment.getPath());
- if (codec == null || codec instanceof SplittableCompressionCodec) {
- splittable = true;
- }
-
- //Delimiter
- String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
- this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
-
- String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
- private char delimiter;
- private FileSystem fs;
- private FSDataInputStream fis;
- private InputStream is; //decompressd stream
- private CompressionCodecFactory factory;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private Seekable filePosition;
- private boolean splittable = false;
- private long startOffset, end, pos;
- private int currentIdx = 0, validIdx = 0, recordCount = 0;
- private int[] targetColumnIndexes;
- private boolean eof = false;
- private final byte[] nullChars;
- private SplitLineReader reader;
- private ArrayList<Long> fileOffsets = new ArrayList<Long>();
- private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
- private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
- private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
- private SerializerDeserializer serde;
-
- @Override
- public void init() throws IOException {
-
- // FileFragment information
- if(fs == null) {
- fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
- }
- if(fis == null) fis = fs.open(fragment.getPath());
-
- recordCount = 0;
- pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getEndKey();
-
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- fis, decompressor, startOffset, end,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
- reader = new CompressedSplitLineReader(cIn, conf, null);
- startOffset = cIn.getAdjustedStart();
- end = cIn.getAdjustedEnd();
- filePosition = cIn;
- is = cIn;
- } else {
- is = new DataInputStream(codec.createInputStream(fis, decompressor));
- reader = new SplitLineReader(is, null);
- filePosition = fis;
- }
- } else {
- fis.seek(startOffset);
- filePosition = fis;
- is = fis;
- reader = new SplitLineReader(is, null);
- }
-
- if (targets == null) {
- targets = schema.toArray();
- }
-
- targetColumnIndexes = new int[targets.length];
- for (int i = 0; i < targets.length; i++) {
- targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
- }
-
- try {
- String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- super.init();
- Arrays.sort(targetColumnIndexes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
- "," + fs.getFileStatus(fragment.getPath()).getLen());
- }
-
- if (startOffset != 0) {
- startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
- pos = startOffset;
- }
- eof = false;
- page();
- }
-
- private int maxBytesToConsume(long pos) {
- return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
- }
-
- private long fragmentable() throws IOException {
- return end - getFilePosition();
- }
-
- private long getFilePosition() throws IOException {
- long retVal;
- if (isCompress()) {
- retVal = filePosition.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
-
- private void page() throws IOException {
-// // Index initialization
- currentIdx = 0;
- validIdx = 0;
- int currentBufferPos = 0;
- int bufferedSize = 0;
-
- buffer.reset();
- startOffsets.clear();
- rowLengthList.clear();
- fileOffsets.clear();
-
- if(eof) return;
-
- while (DEFAULT_PAGE_SIZE >= bufferedSize){
-
- int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
-
- if(ret == 0){
- break;
- } else {
- fileOffsets.add(pos);
- pos += ret;
- startOffsets.add(currentBufferPos);
- currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
- bufferedSize += ret;
- validIdx++;
- recordCount++;
- }
-
- if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
- eof = true;
- break;
- }
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- try {
- if (currentIdx == validIdx) {
- if (eof) {
- return null;
- } else {
- page();
-
- if(currentIdx == validIdx){
- return null;
- }
- }
- }
-
- long offset = -1;
- if(!isCompress()){
- offset = fileOffsets.get(currentIdx);
- }
-
- byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
- rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
- currentIdx++;
- return new LazyTuple(schema, cells, offset, nullChars, serde);
- } catch (Throwable t) {
- LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
- LOG.error("Tuple list current index: " + currentIdx, t);
- throw new IOException(t);
- }
- }
-
- private boolean isCompress() {
- return codec != null;
- }
-
- @Override
- public void reset() throws IOException {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
-
- init();
- }
-
- @Override
- public void close() throws IOException {
- try {
- IOUtils.cleanup(LOG, reader, is, fis);
- fs = null;
- is = null;
- fis = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner processed record:" + recordCount);
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
- }
-
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public void seek(long offset) throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
-
- if (tupleIndex > -1) {
- this.currentIdx = tupleIndex;
- } else if (isSplittable() && end >= offset || startOffset <= offset) {
- eof = false;
- fis.seek(offset);
- pos = offset;
- reader.reset();
- this.currentIdx = 0;
- this.validIdx = 0;
- // pageBuffer();
- } else {
- throw new IOException("invalid offset " +
- " < start : " + startOffset + " , " +
- " end : " + end + " , " +
- " filePos : " + filePosition.getPos() + " , " +
- " input offset : " + offset + " >");
- }
- }
-
- @Override
- public long getNextOffset() throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- if (this.currentIdx == this.validIdx) {
- if (fragmentable() <= 0) {
- return -1;
- } else {
- page();
- if(currentIdx == validIdx) return -1;
- }
- }
- return fileOffsets.get(currentIdx);
- }
-
- @Override
- public boolean isSplittable(){
- return splittable;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
deleted file mode 100644
index 4f58e68..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * Line reader for compressed splits
- *
- * Reading records from a compressed split is tricky, as the
- * LineRecordReader is using the reported compressed input stream
- * position directly to determine when a split has ended. In addition the
- * compressed input stream is usually faking the actual byte position, often
- * updating it only after the first compressed block after the split is
- * accessed.
- *
- * Depending upon where the last compressed block of the split ends relative
- * to the record delimiters it can be easy to accidentally drop the last
- * record or duplicate the last record between this split and the next.
- *
- * Split end scenarios:
- *
- * 1) Last block of split ends in the middle of a record
- * Nothing special that needs to be done here, since the compressed input
- * stream will report a position after the split end once the record
- * is fully read. The consumer of the next split will discard the
- * partial record at the start of the split normally, and no data is lost
- * or duplicated between the splits.
- *
- * 2) Last block of split ends in the middle of a delimiter
- * The line reader will continue to consume bytes into the next block to
- * locate the end of the delimiter. If a custom delimiter is being used
- * then the next record must be read by this split or it will be dropped.
- * The consumer of the next split will not recognize the partial
- * delimiter at the beginning of its split and will discard it along with
- * the next record.
- *
- * However for the default delimiter processing there is a special case
- * because CR, LF, and CRLF are all valid record delimiters. If the
- * block ends with a CR then the reader must peek at the next byte to see
- * if it is an LF and therefore part of the same record delimiter.
- * Peeking at the next byte is an access to the next block and triggers
- * the stream to report the end of the split. There are two cases based
- * on the next byte:
- *
- * A) The next byte is LF
- * The split needs to end after the current record is returned. The
- * consumer of the next split will discard the first record, which
- * is degenerate since LF is itself a delimiter, and start consuming
- * records after that byte. If the current split tries to read
- * another record then the record will be duplicated between splits.
- *
- * B) The next byte is not LF
- * The current record will be returned but the stream will report
- * the split has ended due to the peek into the next block. If the
- * next record is not read then it will be lost, as the consumer of
- * the next split will discard it before processing subsequent
- * records. Therefore the next record beyond the reported split end
- * must be consumed by this split to avoid data loss.
- *
- * 3) Last block of split ends at the beginning of a delimiter
- * This is equivalent to case 1, as the reader will consume bytes into
- * the next block and trigger the end of the split. No further records
- * should be read as the consumer of the next split will discard the
- * (degenerate) record at the beginning of its split.
- *
- * 4) Last block of split ends at the end of a delimiter
- * Nothing special needs to be done here. The reader will not start
- * examining the bytes into the next block until the next record is read,
- * so the stream will not report the end of the split just yet. Once the
- * next record is read then the next block will be accessed and the
- * stream will indicate the end of the split. The consumer of the next
- * split will correctly discard the first record of its split, and no
- * data is lost or duplicated.
- *
- * If the default delimiter is used and the block ends at a CR then this
- * is treated as case 2 since the reader does not yet know without
- * looking at subsequent bytes whether the delimiter has ended.
- *
- * NOTE: It is assumed that compressed input streams *never* return bytes from
- * multiple compressed blocks from a single read. Failure to do so will
- * violate the buffering performed by this class, as it will access
- * bytes into the next block after the split before returning all of the
- * records from the previous block.
- */
-
-public class CompressedSplitLineReader extends SplitLineReader {
- SplitCompressionInputStream scin;
- private boolean usingCRLF;
- private boolean needAdditionalRecord = false;
- private boolean finished = false;
-
- public CompressedSplitLineReader(SplitCompressionInputStream in,
- Configuration conf,
- byte[] recordDelimiterBytes)
- throws IOException {
- super(in, conf, recordDelimiterBytes);
- scin = in;
- usingCRLF = (recordDelimiterBytes == null);
- }
-
- @Override
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException {
- int bytesRead = in.read(buffer);
-
- // If the split ended in the middle of a record delimiter then we need
- // to read one additional record, as the consumer of the next split will
- // not recognize the partial delimiter as a record.
- // However if using the default delimiter and the next character is a
- // linefeed then next split will treat it as a delimiter all by itself
- // and the additional record read should not be performed.
- if (inDelimiter && bytesRead > 0) {
- if (usingCRLF) {
- needAdditionalRecord = (buffer[0] != '\n');
- } else {
- needAdditionalRecord = true;
- }
- }
- return bytesRead;
- }
-
- @Override
- public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
- , int maxBytesToConsume) throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public boolean needAdditionalRecordAfterSplit() {
- return !finished && needAdditionalRecord;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
deleted file mode 100644
index 8841a31..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DataLocation {
- private String host;
- private int volumeId;
-
- public DataLocation(String host, int volumeId) {
- this.host = host;
- this.volumeId = volumeId;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getVolumeId() {
- return volumeId;
- }
-
- @Override
- public String toString() {
- return "DataLocation{" +
- "host=" + host +
- ", volumeId=" + volumeId +
- '}';
- }
-}