You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:44 UTC
[29/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index f807987..8a61cab 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -70,7 +70,7 @@ public class TestSortExec {
util = TpchTestBase.getInstance().getTestingCluster();
catalog = util.getMaster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = StorageManager.getFileStorageManager(conf, workDir);
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
Schema schema = new Schema();
schema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,8 @@ public class TestSortExec {
tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
sm.getFileSystem().mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(employeeMeta, schema, tablePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, schema, tablePath);
appender.init();
Tuple tuple = new VTuple(schema.size());
for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index b74f634..68b3fb3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -565,8 +565,8 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
}
Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
fileIndex++;
- appender = StorageManager.getFileStorageManager(conf).getAppender(tableMeta, schema,
- dataPath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(tableMeta, schema, dataPath);
appender.init();
}
String[] columnDatas = rows[i].split("\\|");
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index ac5ff13..b8f3ef7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -63,7 +63,7 @@ public class TestResultSet {
public static void setup() throws Exception {
util = TpchTestBase.getInstance().getTestingCluster();
conf = util.getConfiguration();
- sm = StorageManager.getFileStorageManager(conf);
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
scoreSchema = new Schema();
scoreSchema.addColumn("deptname", Type.TEXT);
@@ -73,8 +73,7 @@ public class TestResultSet {
Path p = sm.getTablePath("score");
sm.getFileSystem().mkdirs(p);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(scoreMeta, scoreSchema,
- new Path(p, "score"));
+ Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score"));
appender.init();
int deptSize = 100;
int tupleNum = 10000;
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2aa56db..f36ff24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -36,7 +36,6 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.FileUtil;
import org.junit.After;
import org.junit.Before;
@@ -70,7 +69,8 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
- FileStorageManager sm = StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
+ FileStorageManager sm =
+ (FileStorageManager)StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
@@ -80,7 +80,7 @@ public class TestRowFile {
FileUtil.writeProto(fs, metaPath, meta.getProto());
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, dataPath);
+ Appender appender = sm.getAppender(meta, schema, dataPath);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 517f425..5a93538 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -185,7 +185,7 @@ public class TestRangeRetrieverHandler {
reader.open();
TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema,
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema,
StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
@@ -308,7 +308,7 @@ public class TestRangeRetrieverHandler {
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
- SeekableScanner scanner = StorageManager.getSeekableScanner(conf, outputMeta, schema,
+ SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema,
StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d350889..eb8ada9 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -75,7 +75,12 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-storage</artifactId>
+ <artifactId>tajo-storage-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 20cdf16..1c3c410 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -102,7 +102,11 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-storage</artifactId>
+ <artifactId>tajo-storage-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index a82aa46..82ccbdc 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -744,6 +744,22 @@
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-storage</artifactId>
<version>${tajo.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <version>${tajo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
+ <version>${tajo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hbase</artifactId>
+ <version>${tajo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index dee429f..8acb1a9 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -1,5 +1,5 @@
-<!--
<?xml version="1.0" encoding="UTF-8"?>
+<!--
Copyright 2012 Database Lab., Korea Univ.
Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,328 +16,47 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>tajo-project</artifactId>
<groupId>org.apache.tajo</groupId>
<version>0.9.1-SNAPSHOT</version>
<relativePath>../tajo-project</relativePath>
</parent>
-
+ <modelVersion>4.0.0</modelVersion>
<artifactId>tajo-storage</artifactId>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<name>Tajo Storage</name>
- <description>Tajo Storage Package</description>
-
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <parquet.version>1.5.0</parquet.version>
- <parquet.format.version>2.1.0</parquet.format.version>
</properties>
- <repositories>
- <repository>
- <id>repository.jboss.org</id>
- <url>https://repository.jboss.org/nexus/content/repositories/releases/
- </url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
+ <modules>
+ <module>tajo-storage-common</module>
+ <module>tajo-storage-hdfs</module>
+ <module>tajo-storage-hbase</module>
+ </modules>
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
- <executions>
- <execution>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <excludes>
- <exclude>src/test/resources/testVariousTypes.avsc</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemProperties>
- <tajo.test>TRUE</tajo.test>
- </systemProperties>
- <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
- </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
+ <artifactId>maven-surefire-report-plugin</artifactId>
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <id>create-protobuf-generated-sources-directory</id>
- <phase>initialize</phase>
- <configuration>
- <target>
- <mkdir dir="target/generated-sources/proto" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.2</version>
- <executions>
- <execution>
- <id>generate-sources</id>
- <phase>generate-sources</phase>
- <configuration>
- <executable>protoc</executable>
- <arguments>
- <argument>-Isrc/main/proto/</argument>
- <argument>--proto_path=../tajo-common/src/main/proto</argument>
- <argument>--proto_path=../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
- <argument>--java_out=target/generated-sources/proto</argument>
- <argument>src/main/proto/IndexProtos.proto</argument>
- <argument>src/main/proto/StorageFragmentProtos.proto</argument>
- </arguments>
- </configuration>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/proto</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <version>2.7.1</version>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
</plugin>
</plugins>
</build>
- <dependencies>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-catalog-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-plan</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>1.7.7</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>zookeeper</artifactId>
- <groupId>org.apache.zookeeper</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-json</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey.jersey-test-framework</groupId>
- <artifactId>jersey-test-framework-grizzly2</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey.jersey-test-framework</groupId>
- <artifactId>jersey-test-framework-grizzly2</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-api</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-hs</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-column</artifactId>
- <version>${parquet.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-hadoop</artifactId>
- <version>${parquet.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>parquet-format</artifactId>
- <version>${parquet.format.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- </dependencies>
-
<profiles>
<profile>
<id>docs</id>
@@ -382,7 +101,7 @@
<executions>
<execution>
<id>dist</id>
- <phase>package</phase>
+ <phase>prepare-package</phase>
<goals>
<goal>run</goal>
</goals>
@@ -405,12 +124,15 @@
echo
echo "Current directory `pwd`"
echo
- run rm -rf ${project.artifactId}-${project.version}
- run mkdir ${project.artifactId}-${project.version}
- run cd ${project.artifactId}-${project.version}
- run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+ run rm -rf tajo-storage-${project.version}
+ run mkdir tajo-storage-${project.version}
+ run cd tajo-storage-${project.version}
+ run cp -r ${basedir}/tajo-storage-common/target/tajo-storage-common-${project.version}*.jar .
+ run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar .
+ run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar .
+
echo
- echo "Tajo Storage dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+ echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}"
echo
</echo>
<exec executable="sh" dir="${project.build.directory}" failonerror="true">
@@ -430,11 +152,7 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
+ <artifactId>maven-surefire-report-plugin</artifactId>
</plugin>
</plugins>
</reporting>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
deleted file mode 100644
index c5e96ac..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface Appender extends Closeable {
-
- void init() throws IOException;
-
- void addTuple(Tuple t) throws IOException;
-
- void flush() throws IOException;
-
- long getEstimatedOutputSize() throws IOException;
-
- void close() throws IOException;
-
- void enableStats();
-
- TableStats getStats();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
deleted file mode 100644
index b829f60..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.datum.Datum;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * The Comparator class for Tuples
- *
- * @see Tuple
- */
-public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> {
- private final Schema schema;
- private final SortSpec [] sortSpecs;
- private final int[] sortKeyIds;
- private final boolean[] asc;
- @SuppressWarnings("unused")
- private final boolean[] nullFirsts;
-
- private Datum left;
- private Datum right;
- private int compVal;
-
- /**
- * @param schema The schema of input tuples
- * @param sortKeys The description of sort keys
- */
- public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
- Preconditions.checkArgument(sortKeys.length > 0,
- "At least one sort key must be specified.");
-
- this.schema = schema;
- this.sortSpecs = sortKeys;
- this.sortKeyIds = new int[sortKeys.length];
- this.asc = new boolean[sortKeys.length];
- this.nullFirsts = new boolean[sortKeys.length];
- for (int i = 0; i < sortKeys.length; i++) {
- if (sortKeys[i].getSortKey().hasQualifier()) {
- this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
- } else {
- this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
- }
-
- this.asc[i] = sortKeys[i].isAscending();
- this.nullFirsts[i]= sortKeys[i].isNullFirst();
- }
- }
-
- public BaseTupleComparator(TupleComparatorProto proto) {
- this.schema = new Schema(proto.getSchema());
-
- this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
- for (int i = 0; i < proto.getSortSpecsCount(); i++) {
- sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
- }
-
- this.sortKeyIds = new int[proto.getCompSpecsCount()];
- this.asc = new boolean[proto.getCompSpecsCount()];
- this.nullFirsts = new boolean[proto.getCompSpecsCount()];
-
- for (int i = 0; i < proto.getCompSpecsCount(); i++) {
- TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
- sortKeyIds[i] = sortSepcProto.getColumnId();
- asc[i] = sortSepcProto.getAscending();
- nullFirsts[i] = sortSepcProto.getNullFirst();
- }
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public SortSpec [] getSortSpecs() {
- return sortSpecs;
- }
-
- public int [] getSortKeyIds() {
- return sortKeyIds;
- }
-
- @Override
- public boolean isAscendingFirstKey() {
- return this.asc[0];
- }
-
- @Override
- public int compare(Tuple tuple1, Tuple tuple2) {
- for (int i = 0; i < sortKeyIds.length; i++) {
- left = tuple1.get(sortKeyIds[i]);
- right = tuple2.get(sortKeyIds[i]);
-
- if (left.isNull() || right.isNull()) {
- if (!left.equals(right)) {
- if (left.isNull()) {
- compVal = 1;
- } else if (right.isNull()) {
- compVal = -1;
- }
- if (nullFirsts[i]) {
- if (compVal != 0) {
- compVal *= -1;
- }
- }
- } else {
- compVal = 0;
- }
- } else {
- if (asc[i]) {
- compVal = left.compareTo(right);
- } else {
- compVal = right.compareTo(left);
- }
- }
-
- if (compVal < 0 || compVal > 0) {
- return compVal;
- }
- }
- return 0;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(sortKeyIds);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof BaseTupleComparator) {
- BaseTupleComparator other = (BaseTupleComparator) obj;
- if (sortKeyIds.length != other.sortKeyIds.length) {
- return false;
- }
-
- for (int i = 0; i < sortKeyIds.length; i++) {
- if (sortKeyIds[i] != other.sortKeyIds[i] ||
- asc[i] != other.asc[i] ||
- nullFirsts[i] != other.nullFirsts[i]) {
- return false;
- }
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public TupleComparatorProto getProto() {
- TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
- builder.setSchema(schema.getProto());
- for (int i = 0; i < sortSpecs.length; i++) {
- builder.addSortSpecs(sortSpecs[i].getProto());
- }
-
- TupleComparatorSpecProto.Builder sortSpecBuilder;
- for (int i = 0; i < sortKeyIds.length; i++) {
- sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
- sortSpecBuilder.setColumnId(sortKeyIds[i]);
- sortSpecBuilder.setAscending(asc[i]);
- sortSpecBuilder.setNullFirst(nullFirsts[i]);
- builder.addCompSpecs(sortSpecBuilder);
- }
-
- return builder.build();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
-
- String prefix = "";
- for (int i = 0; i < sortKeyIds.length; i++) {
- sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
- .append(",Asc=").append(asc[i])
- .append(",NullFirst=").append(nullFirsts[i]);
- prefix = " ,";
- }
- return sb.toString();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
deleted file mode 100644
index 00112e7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-@Deprecated
-public class BinarySerializerDeserializer implements SerializerDeserializer {
-
- static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
- @Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
- throws IOException {
- byte[] bytes;
- int length = 0;
- if (datum == null || datum instanceof NullDatum) {
- return 0;
- }
-
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- case BIT:
- case CHAR:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case INT2:
- length = writeShort(out, datum.asInt2());
- break;
- case INT4:
- length = writeVLong(out, datum.asInt4());
- break;
- case INT8:
- length = writeVLong(out, datum.asInt8());
- break;
- case FLOAT4:
- length = writeFloat(out, datum.asFloat4());
- break;
- case FLOAT8:
- length = writeDouble(out, datum.asFloat8());
- break;
- case TEXT: {
- bytes = datum.asTextBytes();
- length = datum.size();
- if (length == 0) {
- bytes = INVALID_UTF__SINGLE_BYTE;
- length = INVALID_UTF__SINGLE_BYTE.length;
- }
- out.write(bytes, 0, bytes.length);
- break;
- }
- case BLOB:
- case INET4:
- case INET6:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
- bytes = protobufDatum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case NULL_TYPE:
- break;
- default:
- throw new IOException("Does not support type");
- }
- return length;
- }
-
- @Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
- if (length == 0) return NullDatum.get();
-
- Datum datum;
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- datum = DatumFactory.createBool(bytes[offset]);
- break;
- case BIT:
- datum = DatumFactory.createBit(bytes[offset]);
- break;
- case CHAR: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
- datum = DatumFactory.createChar(chars);
- break;
- }
- case INT2:
- datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
- break;
- case INT4:
- datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
- break;
- case INT8:
- datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
- break;
- case FLOAT4:
- datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
- break;
- case FLOAT8:
- datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
- break;
- case TEXT: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
-
- if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
- datum = DatumFactory.createText(new byte[0]);
- } else {
- datum = DatumFactory.createText(chars);
- }
- break;
- }
- case PROTOBUF: {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(bytes, offset, length);
- datum = factory.createDatum(builder);
- break;
- }
- case INET4:
- datum = DatumFactory.createInet4(bytes, offset, length);
- break;
- case BLOB:
- datum = DatumFactory.createBlob(bytes, offset, length);
- break;
- default:
- datum = NullDatum.get();
- }
- return datum;
- }
-
- private byte[] shortBytes = new byte[2];
-
- public int writeShort(OutputStream out, short val) throws IOException {
- shortBytes[0] = (byte) (val >> 8);
- shortBytes[1] = (byte) val;
- out.write(shortBytes, 0, 2);
- return 2;
- }
-
- public float toFloat(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 4);
-
- int val = ((bytes[offset] & 0x000000FF) << 24) +
- ((bytes[offset + 1] & 0x000000FF) << 16) +
- ((bytes[offset + 2] & 0x000000FF) << 8) +
- (bytes[offset + 3] & 0x000000FF);
- return Float.intBitsToFloat(val);
- }
-
- private byte[] floatBytes = new byte[4];
-
- public int writeFloat(OutputStream out, float f) throws IOException {
- int val = Float.floatToIntBits(f);
-
- floatBytes[0] = (byte) (val >> 24);
- floatBytes[1] = (byte) (val >> 16);
- floatBytes[2] = (byte) (val >> 8);
- floatBytes[3] = (byte) val;
- out.write(floatBytes, 0, 4);
- return floatBytes.length;
- }
-
- public double toDouble(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 8);
- long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
- ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
- ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
- ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
- ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
- ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
- ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
- (long) (bytes[offset + 7] & 0x00000000000000FF);
- return Double.longBitsToDouble(val);
- }
-
- private byte[] doubleBytes = new byte[8];
-
- public int writeDouble(OutputStream out, double d) throws IOException {
- long val = Double.doubleToLongBits(d);
-
- doubleBytes[0] = (byte) (val >> 56);
- doubleBytes[1] = (byte) (val >> 48);
- doubleBytes[2] = (byte) (val >> 40);
- doubleBytes[3] = (byte) (val >> 32);
- doubleBytes[4] = (byte) (val >> 24);
- doubleBytes[5] = (byte) (val >> 16);
- doubleBytes[6] = (byte) (val >> 8);
- doubleBytes[7] = (byte) val;
- out.write(doubleBytes, 0, 8);
- return doubleBytes.length;
- }
-
- private byte[] vLongBytes = new byte[9];
-
- public static int writeVLongToByteArray(byte[] bytes, int offset, long l) {
- if (l >= -112 && l <= 127) {
- bytes[offset] = (byte) l;
- return 1;
- }
-
- int len = -112;
- if (l < 0) {
- l ^= -1L; // take one's complement'
- len = -120;
- }
-
- long tmp = l;
- while (tmp != 0) {
- tmp = tmp >> 8;
- len--;
- }
-
- bytes[offset++] = (byte) len;
- len = (len < -120) ? -(len + 120) : -(len + 112);
-
- for (int idx = len; idx != 0; idx--) {
- int shiftbits = (idx - 1) * 8;
- bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
- }
- return 1 + len;
- }
-
- public int writeVLong(OutputStream out, long l) throws IOException {
- int len = writeVLongToByteArray(vLongBytes, 0, l);
- out.write(vLongBytes, 0, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
deleted file mode 100644
index 85c79fa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/* this class is PooledBuffer holder */
-public class BufferPool {
-
- private static final PooledByteBufAllocator allocator;
-
- private BufferPool() {
- }
-
- static {
- //TODO we need determine the default params
- allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
-
- /* if you are finding memory leak, please enable this line */
- //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
- }
-
- public static long maxDirectMemory() {
- return PlatformDependent.maxDirectMemory();
- }
-
-
- public synchronized static ByteBuf directBuffer(int size) {
- return allocator.directBuffer(size);
- }
-
- /**
- *
- * @param size the initial capacity
- * @param max the max capacity
- * @return allocated ByteBuf from pool
- */
- public static ByteBuf directBuffer(int size, int max) {
- return allocator.directBuffer(size, max);
- }
-
- @InterfaceStability.Unstable
- public static void forceRelease(ByteBuf buf) {
- buf.release(buf.refCnt());
- }
-
- /**
- * the ByteBuf will increase to writable size
- * @param buf
- * @param minWritableBytes required minimum writable size
- */
- public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
- buf.ensureWritable(minWritableBytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
deleted file mode 100644
index b1b6d65..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.io.IOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
-
-public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel {
-
- ByteBufferReadable byteBufferReadable;
- ReadableByteChannel channel;
- InputStream inputStream;
-
- public ByteBufInputChannel(InputStream inputStream) {
- if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) {
- this.byteBufferReadable = (ByteBufferReadable) inputStream;
- } else {
- this.channel = Channels.newChannel(inputStream);
- }
-
- this.inputStream = inputStream;
- }
-
- @Override
- public long read(ByteBuffer[] dsts, int offset, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long read(ByteBuffer[] dsts) {
- return read(dsts, 0, dsts.length);
- }
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
- if (byteBufferReadable != null) {
- return byteBufferReadable.read(dst);
- } else {
- return channel.read(dst);
- }
- }
-
- @Override
- protected void implCloseChannel() throws IOException {
- IOUtils.cleanup(null, channel, inputStream);
- }
-
- public int available() throws IOException {
- return inputStream.available();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
deleted file mode 100644
index 1e2b0f3..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class CSVFile {
-
- public static final byte LF = '\n';
- public static int EOF = -1;
-
- private static final Log LOG = LogFactory.getLog(CSVFile.class);
-
- public static class CSVAppender extends FileAppender {
- private final TableMeta meta;
- private final Schema schema;
- private final int columnNum;
- private final FileSystem fs;
- private FSDataOutputStream fos;
- private DataOutputStream outputStream;
- private CompressionOutputStream deflateFilter;
- private char delimiter;
- private TableStatistics stats = null;
- private Compressor compressor;
- private CompressionCodecFactory codecFactory;
- private CompressionCodec codec;
- private Path compressedPath;
- private byte[] nullChars;
- private int BUFFER_SIZE = 128 * 1024;
- private int bufferedBytes = 0;
- private long pos = 0;
- private boolean isShuffle;
-
- private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
- private SerializerDeserializer serde;
-
- public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
- final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
- super(conf, taskAttemptId, schema, meta, workDir);
- this.fs = workDir.getFileSystem(conf);
- this.meta = meta;
- this.schema = schema;
- this.delimiter = StringEscapeUtils.unescapeJava(
- this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-
- this.columnNum = schema.size();
-
- String nullCharacters = StringEscapeUtils.unescapeJava(
- this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT));
-
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- @Override
- public void init() throws IOException {
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.getParent().toString());
- }
-
- //determine the intermediate file type
- String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
- TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
- if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
- isShuffle = true;
- } else {
- isShuffle = false;
- }
-
- if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
- String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
- codecFactory = new CompressionCodecFactory(conf);
- codec = codecFactory.getCodecByClassName(codecName);
- compressor = CodecPool.getCompressor(codec);
- if(compressor != null) compressor.reset(); //builtin gzip is null
-
- String extension = codec.getDefaultExtension();
- compressedPath = path.suffix(extension);
-
- if (fs.exists(compressedPath)) {
- throw new AlreadyExistsStorageException(compressedPath);
- }
-
- fos = fs.create(compressedPath);
- deflateFilter = codec.createOutputStream(fos, compressor);
- outputStream = new DataOutputStream(deflateFilter);
-
- } else {
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
- fos = fs.create(path);
- outputStream = new DataOutputStream(new BufferedOutputStream(fos));
- }
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- try {
- //It will be remove, because we will add custom serde in textfile
- String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
- TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- os.reset();
- pos = fos.getPos();
- bufferedBytes = 0;
- super.init();
- }
-
-
- @Override
- public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
- int rowBytes = 0;
-
- for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
- if(columnNum - 1 > i){
- os.write((byte) delimiter);
- rowBytes += 1;
- }
- if (isShuffle) {
- // it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, datum);
- }
- }
- os.write(LF);
- rowBytes += 1;
-
- pos += rowBytes;
- bufferedBytes += rowBytes;
- if(bufferedBytes > BUFFER_SIZE){
- flushBuffer();
- }
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- private void flushBuffer() throws IOException {
- if(os.getLength() > 0) {
- os.writeTo(outputStream);
- os.reset();
- bufferedBytes = 0;
- }
- }
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- @Override
- public void flush() throws IOException {
- flushBuffer();
- outputStream.flush();
- }
-
- @Override
- public void close() throws IOException {
-
- try {
- flush();
-
- // Statistical section
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
-
- if(deflateFilter != null) {
- deflateFilter.finish();
- deflateFilter.resetState();
- deflateFilter = null;
- }
-
- os.close();
- } finally {
- IOUtils.cleanup(LOG, fos);
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
-
- public boolean isCompress() {
- return compressor != null;
- }
-
- public String getExtension() {
- return codec != null ? codec.getDefaultExtension() : "";
- }
- }
-
- public static class CSVScanner extends FileScanner implements SeekableScanner {
- public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
- factory = new CompressionCodecFactory(conf);
- codec = factory.getCodec(this.fragment.getPath());
- if (codec == null || codec instanceof SplittableCompressionCodec) {
- splittable = true;
- }
-
- //Delimiter
- this.delimiter = StringEscapeUtils.unescapeJava(
- meta.getOption(StorageConstants.TEXT_DELIMITER,
- meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0);
-
- String nullCharacters = StringEscapeUtils.unescapeJava(
- meta.getOption(StorageConstants.TEXT_NULL,
- meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT)));
-
- if (StringUtils.isEmpty(nullCharacters)) {
- nullChars = NullDatum.get().asTextBytes();
- } else {
- nullChars = nullCharacters.getBytes();
- }
- }
-
- private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
- private char delimiter;
- private FileSystem fs;
- private FSDataInputStream fis;
- private InputStream is; //decompressd stream
- private CompressionCodecFactory factory;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private Seekable filePosition;
- private boolean splittable = false;
- private long startOffset, end, pos;
- private int currentIdx = 0, validIdx = 0, recordCount = 0;
- private int[] targetColumnIndexes;
- private boolean eof = false;
- private final byte[] nullChars;
- private SplitLineReader reader;
- private ArrayList<Long> fileOffsets;
- private ArrayList<Integer> rowLengthList;
- private ArrayList<Integer> startOffsets;
- private NonSyncByteArrayOutputStream buffer;
- private SerializerDeserializer serde;
-
- @Override
- public void init() throws IOException {
- fileOffsets = new ArrayList<Long>();
- rowLengthList = new ArrayList<Integer>();
- startOffsets = new ArrayList<Integer>();
- buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
-
- // FileFragment information
- if(fs == null) {
- fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
- }
- if(fis == null) fis = fs.open(fragment.getPath());
-
- recordCount = 0;
- pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getLength();
-
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- fis, decompressor, startOffset, end,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
- reader = new CompressedSplitLineReader(cIn, conf, null);
- startOffset = cIn.getAdjustedStart();
- end = cIn.getAdjustedEnd();
- filePosition = cIn;
- is = cIn;
- } else {
- is = new DataInputStream(codec.createInputStream(fis, decompressor));
- reader = new SplitLineReader(is, null);
- filePosition = fis;
- }
- } else {
- fis.seek(startOffset);
- filePosition = fis;
- is = fis;
- reader = new SplitLineReader(is, null);
- }
-
- if (targets == null) {
- targets = schema.toArray();
- }
-
- targetColumnIndexes = new int[targets.length];
- for (int i = 0; i < targets.length; i++) {
- targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
-
- try {
- //FIXME
- String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
- TextSerializerDeserializer.class.getName());
- serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e);
- }
-
- super.init();
- Arrays.sort(targetColumnIndexes);
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
- "," + fs.getFileStatus(fragment.getPath()).getLen());
- }
-
- if (startOffset != 0) {
- pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
- }
- eof = false;
- page();
- }
-
- private int maxBytesToConsume(long pos) {
- return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
- }
-
- private long fragmentable() throws IOException {
- return end - getFilePosition();
- }
-
- private long getFilePosition() throws IOException {
- long retVal;
- if (isCompress()) {
- retVal = filePosition.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
-
- private void page() throws IOException {
-// // Index initialization
- currentIdx = 0;
- validIdx = 0;
- int currentBufferPos = 0;
- int bufferedSize = 0;
-
- buffer.reset();
- startOffsets.clear();
- rowLengthList.clear();
- fileOffsets.clear();
-
- if(eof) {
- return;
- }
-
- while (DEFAULT_PAGE_SIZE >= bufferedSize){
-
- int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
-
- if(ret == 0){
- break;
- } else {
- fileOffsets.add(pos);
- pos += ret;
- startOffsets.add(currentBufferPos);
- currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
- bufferedSize += ret;
- validIdx++;
- recordCount++;
- }
-
- if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
- eof = true;
- break;
- }
- }
- if (tableStats != null) {
- tableStats.setReadBytes(pos - startOffset);
- tableStats.setNumRows(recordCount);
- }
- }
-
- @Override
- public float getProgress() {
- try {
- if(eof) {
- return 1.0f;
- }
- long filePos = getFilePosition();
- if (startOffset == filePos) {
- return 0.0f;
- } else {
- long readBytes = filePos - startOffset;
- long remainingBytes = Math.max(end - filePos, 0);
- return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return 0.0f;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- try {
- if (currentIdx == validIdx) {
- if (eof) {
- return null;
- } else {
- page();
-
- if(currentIdx == validIdx){
- return null;
- }
- }
- }
-
- long offset = -1;
- if(!isCompress()){
- offset = fileOffsets.get(currentIdx);
- }
-
- byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
- rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
- currentIdx++;
- return new LazyTuple(schema, cells, offset, nullChars, serde);
- } catch (Throwable t) {
- LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
- LOG.error("Tuple list current index: " + currentIdx, t);
- throw new IOException(t);
- }
- }
-
- private boolean isCompress() {
- return codec != null;
- }
-
- @Override
- public void reset() throws IOException {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
-
- init();
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (tableStats != null) {
- tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead)
- tableStats.setNumRows(recordCount);
- }
-
- IOUtils.cleanup(LOG, reader, is, fis);
- fs = null;
- is = null;
- fis = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner processed record:" + recordCount);
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
- }
-
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public void seek(long offset) throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
-
- if (tupleIndex > -1) {
- this.currentIdx = tupleIndex;
- } else if (isSplittable() && end >= offset || startOffset <= offset) {
- eof = false;
- fis.seek(offset);
- pos = offset;
- reader.reset();
- this.currentIdx = 0;
- this.validIdx = 0;
- // pageBuffer();
- } else {
- throw new IOException("invalid offset " +
- " < start : " + startOffset + " , " +
- " end : " + end + " , " +
- " filePos : " + filePosition.getPos() + " , " +
- " input offset : " + offset + " >");
- }
- }
-
- @Override
- public long getNextOffset() throws IOException {
- if(isCompress()) throw new UnsupportedException();
-
- if (this.currentIdx == this.validIdx) {
- if (fragmentable() <= 0) {
- return -1;
- } else {
- page();
- if(currentIdx == validIdx) return -1;
- }
- }
- return fileOffsets.get(currentIdx);
- }
-
- @Override
- public boolean isSplittable(){
- return splittable;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
deleted file mode 100644
index 4f58e68..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * Line reader for compressed splits
- *
- * Reading records from a compressed split is tricky, as the
- * LineRecordReader is using the reported compressed input stream
- * position directly to determine when a split has ended. In addition the
- * compressed input stream is usually faking the actual byte position, often
- * updating it only after the first compressed block after the split is
- * accessed.
- *
- * Depending upon where the last compressed block of the split ends relative
- * to the record delimiters it can be easy to accidentally drop the last
- * record or duplicate the last record between this split and the next.
- *
- * Split end scenarios:
- *
- * 1) Last block of split ends in the middle of a record
- * Nothing special that needs to be done here, since the compressed input
- * stream will report a position after the split end once the record
- * is fully read. The consumer of the next split will discard the
- * partial record at the start of the split normally, and no data is lost
- * or duplicated between the splits.
- *
- * 2) Last block of split ends in the middle of a delimiter
- * The line reader will continue to consume bytes into the next block to
- * locate the end of the delimiter. If a custom delimiter is being used
- * then the next record must be read by this split or it will be dropped.
- * The consumer of the next split will not recognize the partial
- * delimiter at the beginning of its split and will discard it along with
- * the next record.
- *
- * However for the default delimiter processing there is a special case
- * because CR, LF, and CRLF are all valid record delimiters. If the
- * block ends with a CR then the reader must peek at the next byte to see
- * if it is an LF and therefore part of the same record delimiter.
- * Peeking at the next byte is an access to the next block and triggers
- * the stream to report the end of the split. There are two cases based
- * on the next byte:
- *
- * A) The next byte is LF
- * The split needs to end after the current record is returned. The
- * consumer of the next split will discard the first record, which
- * is degenerate since LF is itself a delimiter, and start consuming
- * records after that byte. If the current split tries to read
- * another record then the record will be duplicated between splits.
- *
- * B) The next byte is not LF
- * The current record will be returned but the stream will report
- * the split has ended due to the peek into the next block. If the
- * next record is not read then it will be lost, as the consumer of
- * the next split will discard it before processing subsequent
- * records. Therefore the next record beyond the reported split end
- * must be consumed by this split to avoid data loss.
- *
- * 3) Last block of split ends at the beginning of a delimiter
- * This is equivalent to case 1, as the reader will consume bytes into
- * the next block and trigger the end of the split. No further records
- * should be read as the consumer of the next split will discard the
- * (degenerate) record at the beginning of its split.
- *
- * 4) Last block of split ends at the end of a delimiter
- * Nothing special needs to be done here. The reader will not start
- * examining the bytes into the next block until the next record is read,
- * so the stream will not report the end of the split just yet. Once the
- * next record is read then the next block will be accessed and the
- * stream will indicate the end of the split. The consumer of the next
- * split will correctly discard the first record of its split, and no
- * data is lost or duplicated.
- *
- * If the default delimiter is used and the block ends at a CR then this
- * is treated as case 2 since the reader does not yet know without
- * looking at subsequent bytes whether the delimiter has ended.
- *
- * NOTE: It is assumed that compressed input streams *never* return bytes from
- * multiple compressed blocks from a single read. Failure to do so will
- * violate the buffering performed by this class, as it will access
- * bytes into the next block after the split before returning all of the
- * records from the previous block.
- */
-
-public class CompressedSplitLineReader extends SplitLineReader {
- SplitCompressionInputStream scin;
- private boolean usingCRLF;
- private boolean needAdditionalRecord = false;
- private boolean finished = false;
-
- public CompressedSplitLineReader(SplitCompressionInputStream in,
- Configuration conf,
- byte[] recordDelimiterBytes)
- throws IOException {
- super(in, conf, recordDelimiterBytes);
- scin = in;
- usingCRLF = (recordDelimiterBytes == null);
- }
-
- @Override
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException {
- int bytesRead = in.read(buffer);
-
- // If the split ended in the middle of a record delimiter then we need
- // to read one additional record, as the consumer of the next split will
- // not recognize the partial delimiter as a record.
- // However if using the default delimiter and the next character is a
- // linefeed then next split will treat it as a delimiter all by itself
- // and the additional record read should not be performed.
- if (inDelimiter && bytesRead > 0) {
- if (usingCRLF) {
- needAdditionalRecord = (buffer[0] != '\n');
- } else {
- needAdditionalRecord = true;
- }
- }
- return bytesRead;
- }
-
- @Override
- public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
- , int maxBytesToConsume) throws IOException {
- int bytesRead = 0;
- if (!finished) {
- // only allow at most one more record to be read after the stream
- // reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
- finished = true;
- }
-
- bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
- }
- return bytesRead;
- }
-
- @Override
- public boolean needAdditionalRecordAfterSplit() {
- return !finished && needAdditionalRecord;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
deleted file mode 100644
index 8841a31..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DataLocation {
- private String host;
- private int volumeId;
-
- public DataLocation(String host, int volumeId) {
- this.host = host;
- this.volumeId = volumeId;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getVolumeId() {
- return volumeId;
- }
-
- @Override
- public String toString() {
- return "DataLocation{" +
- "host=" + host +
- ", volumeId=" + volumeId +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
deleted file mode 100644
index 2396349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
- private int id;
- private String name;
-
- private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
- public DiskDeviceInfo(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return id + "," + name;
- }
-
- public void addMountPath(DiskMountInfo diskMountInfo) {
- mountInfos.add(diskMountInfo);
- }
-
- public List<DiskMountInfo> getMountInfos() {
- return mountInfos;
- }
-
- public void setMountInfos(List<DiskMountInfo> mountInfos) {
- this.mountInfos = mountInfos;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
deleted file mode 100644
index 22f18ba..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DiskInfo {
- private int id;
- private String partitionName;
- private String mountPath;
-
- private long capacity;
- private long used;
-
- public DiskInfo(int id, String partitionName) {
- this.id = id;
- this.partitionName = partitionName;
- }
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getPartitionName() {
- return partitionName;
- }
-
- public void setPartitionName(String partitionName) {
- this.partitionName = partitionName;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
deleted file mode 100644
index aadb0e7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
- private String mountPath;
-
- private long capacity;
- private long used;
-
- private int deviceId;
-
- public DiskMountInfo(int deviceId, String mountPath) {
- this.mountPath = mountPath;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
- public int getDeviceId() {
- return deviceId;
- }
-
- @Override
- public boolean equals(Object obj){
- if (!(obj instanceof DiskMountInfo)) return false;
-
- if (compareTo((DiskMountInfo) obj) == 0) return true;
- else return false;
- }
-
- @Override
- public int hashCode(){
- return Objects.hashCode(mountPath);
- }
-
- @Override
- public int compareTo(DiskMountInfo other) {
- String path1 = mountPath;
- String path2 = other.mountPath;
-
- int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
- int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-
- if(path1Depth > path2Depth) {
- return -1;
- } else if(path1Depth < path2Depth) {
- return 1;
- } else {
- int path1Length = path1.length();
- int path2Length = path2.length();
-
- if(path1Length < path2Length) {
- return 1;
- } else if(path1Length > path2Length) {
- return -1;
- } else {
- return path1.compareTo(path2);
- }
- }
- }
-}