You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/13 05:29:56 UTC
[1/4] TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master 5d3966a8c -> 5ad7fbae9
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
index 7111eae..f56413a 100644
--- a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
+++ b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
@@ -20,37 +20,86 @@
-->
<configuration>
+ <property>
+ <name>tajo.storage.manager.v2</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.maxReadBytes</name>
+ <value>8388608</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.concurrency.perDisk</name>
+ <value>1</value>
+ <description></description>
+ </property>
+
<!--- Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
<value>csv,raw,rcfile,row,trevni</value>
</property>
+ <!--
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+ -->
+
<property>
<name>tajo.storage.scanner-handler.csv.class</name>
<value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.csv.class</name>
+ <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.rcfile.class</name>
<value>org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+ <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.rowfile.class</name>
<value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.trevni.class</name>
<value>org.apache.tajo.storage.trevni.TrevniScanner</value>
</property>
+ <property>
+ <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 42c68b6..401bd9e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -105,7 +105,7 @@ public class TestCompressionStorages {
meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
Path tablePath = new Path(testDir, "SplitCompression");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.enableStats();
appender.init();
@@ -138,7 +138,7 @@ public class TestCompressionStorages {
tablets[0] = new Fragment("SplitCompression", tablePath, meta, 0, randomNum);
tablets[1] = new Fragment("SplitCompression", tablePath, meta, randomNum, (fileLen - randomNum));
- Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema);
scanner.init();
int tupleCnt = 0;
Tuple tuple;
@@ -147,7 +147,7 @@ public class TestCompressionStorages {
}
scanner.close();
- scanner = StorageManager.getScanner(conf, meta, tablets[1], schema);
+ scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1], schema);
scanner.init();
while ((tuple = scanner.next()) != null) {
tupleCnt++;
@@ -167,7 +167,7 @@ public class TestCompressionStorages {
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.enableStats();
appender.init();
@@ -196,7 +196,7 @@ public class TestCompressionStorages {
Fragment[] tablets = new Fragment[1];
tablets[0] = new Fragment(fileName, tablePath, meta, 0, fileLen);
- Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema);
scanner.init();
int tupleCnt = 0;
Tuple tuple;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 7c40d3d..b869dbb 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -21,11 +21,6 @@ package org.apache.tajo.storage;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
@@ -38,6 +33,11 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Arrays;
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestMergeScanner {
private TajoConf conf;
- StorageManager sm;
+ AbstractStorageManager sm;
private static String TEST_PATH = "target/test-data/TestMergeScanner";
private Path testDir;
private StoreType storeType;
@@ -77,7 +77,7 @@ public class TestMergeScanner {
conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
}
@Test
@@ -92,7 +92,7 @@ public class TestMergeScanner {
TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options);
Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = StorageManager.getAppender(conf, meta, table1Path);
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, table1Path);
appender1.enableStats();
appender1.init();
int tupleNum = 10000;
@@ -114,7 +114,7 @@ public class TestMergeScanner {
}
Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = StorageManager.getAppender(conf, meta, table2Path);
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, table2Path);
appender2.enableStats();
appender2.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 4c4462f..5881432 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -20,9 +20,6 @@ package org.apache.tajo.storage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -32,6 +29,9 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals;
public class TestStorageManager {
private TajoConf conf;
private static String TEST_PATH = "target/test-data/TestStorageManager";
- StorageManager sm = null;
+ AbstractStorageManager sm = null;
private Path testDir;
private FileSystem fs;
@Before
@@ -48,7 +48,7 @@ public class TestStorageManager {
conf = new TajoConf();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
}
@After
@@ -75,14 +75,14 @@ public class TestStorageManager {
Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
fs.mkdirs(path.getParent());
- Appender appender = StorageManager.getAppender(conf, meta, path);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, path);
appender.init();
for(Tuple t : tuples) {
appender.addTuple(t);
}
appender.close();
- Scanner scanner = StorageManager.getScanner(conf, meta, path);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, path);
scanner.init();
int i=0;
while(scanner.next() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index bde9835..364600c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -91,7 +91,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
Path tablePath = new Path(testDir, "Splitable.data");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.enableStats();
appender.init();
int tupleNum = 10000;
@@ -115,7 +115,7 @@ public class TestStorages {
tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum);
tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum));
- Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema);
scanner.init();
int tupleCnt = 0;
while (scanner.next() != null) {
@@ -123,7 +123,7 @@ public class TestStorages {
}
scanner.close();
- scanner = StorageManager.getScanner(conf, meta, tablets[1], schema);
+ scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1], schema);
scanner.init();
while (scanner.next() != null) {
tupleCnt++;
@@ -144,7 +144,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
Path tablePath = new Path(testDir, "testProjection.data");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
int tupleNum = 10000;
VTuple vTuple;
@@ -164,7 +164,7 @@ public class TestStorages {
Schema target = new Schema();
target.addColumn("age", Type.INT8);
target.addColumn("score", Type.FLOAT4);
- Scanner scanner = StorageManager.getScanner(conf, meta, fragment, target);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment, target);
scanner.init();
int tupleCnt = 0;
Tuple tuple;
@@ -201,7 +201,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options);
Path tablePath = new Path(testDir, "testVariousTypes.data");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple = new VTuple(12);
@@ -225,7 +225,7 @@ public class TestStorages {
FileStatus status = fs.getFileStatus(tablePath);
Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen());
- Scanner scanner = StorageManager.getScanner(conf, meta, fragment);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
scanner.init();
Tuple retrieved;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index d97a27c..97123c6 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -21,8 +21,6 @@ package org.apache.tajo.storage.index;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -32,7 +30,10 @@ import org.apache.tajo.storage.*;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.storage.v2.FileScannerV2;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
@@ -73,7 +74,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = new Path(testDir, "FindValueInCSV.csv");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
@@ -108,8 +109,9 @@ public class TestBSTIndex {
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -130,8 +132,11 @@ public class TestBSTIndex {
tuple = new VTuple(keySchema.getColumnNum());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+ if(scanner instanceof FileScannerV2) {
+ ((FileScannerV2)scanner).waitScanStart();
+ }
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
@@ -157,7 +162,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = new Path(testDir, "BuildIndexWithAppender.csv");
- FileAppender appender = (FileAppender) StorageManager.getAppender(conf, meta, tablePath);
+ FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
SortSpec [] sortKeys = new SortSpec[2];
@@ -205,8 +210,9 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
keySchema, comp);
reader.open();
- SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
@@ -232,7 +238,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = StorageUtil.concatPath(testDir, "FindOmittedValueInCSV.csv");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
@@ -265,8 +271,9 @@ public class TestBSTIndex {
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -299,7 +306,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = new Path(testDir, "FindNextKeyValueInCSV.csv");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
@@ -332,9 +339,10 @@ public class TestBSTIndex {
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
-
- SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -355,8 +363,9 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"),
keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
Tuple result;
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
keyTuple = new VTuple(2);
@@ -385,7 +394,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = new Path(testDir, "FindNextKeyOmittedValueInCSV.csv");
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i+=2) {
@@ -419,8 +428,9 @@ public class TestBSTIndex {
creater.setLoadNum(LOAD_NUM);
creater.open();
- SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
Tuple keyTuple;
long offset;
while (true) {
@@ -441,8 +451,13 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInCSV.idx"),
keySchema, comp);
reader.open();
- scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
scanner.init();
+
+ if(scanner instanceof FileScannerV2) {
+ ((FileScannerV2)scanner).waitScanStart();
+ }
+
Tuple result;
for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
keyTuple = new VTuple(2);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 18a69e6..72bdbb0 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -21,8 +21,6 @@ package org.apache.tajo.storage.index;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -34,12 +32,14 @@ import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
+import static org.apache.tajo.storage.CSVFile.CSVScanner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.apache.tajo.storage.CSVFile.CSVScanner;
public class TestSingleCSVFileBSTIndex {
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
"table1.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
index 7111eae..637e2f6 100644
--- a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
+++ b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
@@ -20,6 +20,11 @@
-->
<configuration>
+ <property>
+ <name>tajo.storage.manager.v2</name>
+ <value>false</value>
+ </property>
+
<!--- Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
@@ -32,25 +37,50 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.csv.class</name>
+ <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.rcfile.class</name>
<value>org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+ <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.rowfile.class</name>
<value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
</property>
<property>
+ <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.trevni.class</name>
<value>org.apache.tajo.storage.trevni.TrevniScanner</value>
</property>
+ <property>
+ <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
[2/4] TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 79481fa..9907591 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,148 +18,27 @@
package org.apache.tajo.storage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.TableMetaImpl;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
/**
* StorageManager
*/
-public class StorageManager {
- private final Log LOG = LogFactory.getLog(StorageManager.class);
+public class StorageManager extends AbstractStorageManager {
- private final TajoConf conf;
- private final FileSystem fs;
- private final Path baseDir;
- private final Path tableBaseDir;
- private final boolean blocksMetadataEnabled;
-
- /**
- * Cache of scanner handlers for each storage type.
- */
- private static final Map<String, Class<? extends FileScanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileScanner>>();
-
- /**
- * Cache of appender handlers for each storage type.
- */
- private static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-
- public StorageManager(TajoConf conf) throws IOException {
- this.conf = conf;
- this.baseDir = new Path(conf.getVar(ConfVars.ROOT_DIR));
- this.tableBaseDir = new Path(this.baseDir, TajoConstants.WAREHOUSE_DIR_NAME);
- this.fs = baseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled) {
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
- }
-
- public static StorageManager get(TajoConf conf) throws IOException {
- return new StorageManager(conf);
- }
-
- public static StorageManager get(TajoConf conf, String dataRoot)
- throws IOException {
- conf.setVar(ConfVars.ROOT_DIR, dataRoot);
- return new StorageManager(conf);
- }
-
- public static StorageManager get(TajoConf conf, Path dataRoot)
- throws IOException {
- conf.setVar(ConfVars.ROOT_DIR, dataRoot.toString());
- return new StorageManager(conf);
- }
-
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getBaseDir() {
- return this.baseDir;
- }
-
- public Path getTableBaseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
+ protected StorageManager(TajoConf conf) throws IOException {
+ super(conf);
}
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
- return getScanner(conf, meta, fragment);
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment)
- throws IOException {
- return getScanner(conf, meta, fragment, meta.getSchema());
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment,
- Schema target)
- throws IOException {
+ @Override
+ public Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException {
Scanner scanner;
- Class<? extends FileScanner> scannerClass;
+ Class<? extends Scanner> scannerClass;
String handlerName = meta.getStoreType().name().toLowerCase();
scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
@@ -167,7 +46,7 @@ public class StorageManager {
scannerClass = conf.getClass(
String.format("tajo.storage.scanner-handler.%s.class",
meta.getStoreType().name().toLowerCase()), null,
- FileScanner.class);
+ Scanner.class);
SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
}
@@ -182,525 +61,4 @@ public class StorageManager {
return scanner;
}
-
- public static Appender getAppender(Configuration conf, TableMeta meta, Path path)
- throws IOException {
- Appender appender;
-
- Class<? extends FileAppender> appenderClass;
-
- String handlerName = meta.getStoreType().name().toLowerCase();
- appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
- if (appenderClass == null) {
- appenderClass = conf.getClass(
- String.format("tajo.storage.appender-handler.%s.class",
- meta.getStoreType().name().toLowerCase()), null,
- FileAppender.class);
- APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
- }
-
- if (appenderClass == null) {
- throw new IOException("Unknown Storage Type: " + meta.getStoreType());
- }
-
- appender = newAppenderInstance(appenderClass, conf, meta, path);
-
- return appender;
- }
-
-
- public TableMeta getTableMeta(Path tablePath) throws IOException {
- TableMeta meta;
-
- FileSystem fs = tablePath.getFileSystem(conf);
- Path tableMetaPath = new Path(tablePath, ".meta");
- if (!fs.exists(tableMetaPath)) {
- throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
- }
-
- FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
- TableProto tableProto = (TableProto) FileUtil.loadProto(tableMetaIn,
- TableProto.getDefaultInstance());
- meta = new TableMetaImpl(tableProto);
-
- return meta;
- }
-
- public Fragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public Fragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- TableMeta meta = getTableMeta(tablePath);
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
- listTablets.add(tablet);
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public Fragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public Fragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private Fragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- TableMeta meta = getTableMeta(tablePath);
- long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
- } else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
- }
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
- } else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
- }
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- for (FileStatus status : fs.listStatus(tablePath)) {
- totalSize += status.getLen();
- }
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- private static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path path) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- Path[] dirs = new Path[]{path};
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Get the lower bound on split size imposed by the format.
- *
- * @return the number of bytes of the minimal split for this format
- */
- protected long getFormatMinSplitSize() {
- return 1;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- * @param filename the file name to check
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
- Scanner scanner = getScanner(conf, meta, filename);
- return scanner.isSplittable();
- }
-
- @Deprecated
- protected long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
-
- @Deprecated
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- @Deprecated
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
- return new Fragment(fragmentId, file, meta, start, length);
- }
-
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
- int[] diskIds) throws IOException {
- return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
- int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- hostsBlockCount[i] = entry.getValue();
- }
- return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
- }
-
- /**
- * Get the maximum split size.
- *
- * @return the maximum number of bytes a split can include
- */
- @Deprecated
- public static long getMaxSplitSize() {
- // TODO - to be configurable
- return 536870912L;
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- @Deprecated
- public static long getMinSplitSize() {
- // TODO - to be configurable
- return 67108864L;
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].isValid()) {
- String volumeIdString = volumeIds[i].toString();
- byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
-
- if (volumeIdBytes.length == 4) {
- diskId = Bytes.toInt(volumeIdBytes);
- } else if (volumeIdBytes.length == 1) {
- diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
- }
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (Fragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
- // generate splits'
-
- List<Fragment> splits = new ArrayList<Fragment>();
- List<FileStatus> files = listStatus(inputPath);
- FileSystem fs = inputPath.getFileSystem(conf);
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, path);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
- // supported disk volume
- BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
- .getFileBlockStorageLocations(Arrays.asList(blkLocations));
- if (splittable) {
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
- .getVolumeIds())));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
- }
-
- } else {
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- splits.add(makeSplit(tableName, meta, path, blockLocation, null));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, meta, path, 0, length));
- }
- }
-
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private class InvalidInputException extends IOException {
- public InvalidInputException(
- List<IOException> errors) {
- }
- }
-
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- TableMeta.class,
- Fragment.class
- };
-
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- TableMeta.class,
- Path.class
- };
-
- /**
- * create a scanner instance.
- */
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
- Fragment fragment) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, meta, fragment});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
- * create a scanner instance.
- */
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
- Path path) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, meta, path});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
new file mode 100644
index 0000000..8b7c2ca
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StorageManagerFactory {
+ private static Map<String, AbstractStorageManager> storageManagers =
+ new HashMap<String, AbstractStorageManager>();
+
+ public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
+ return getStorageManager(conf, null);
+ }
+
+ public static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path dataRoot) throws IOException {
+ return getStorageManager(conf, dataRoot, conf.getBoolean("tajo.storage.manager.v2", false));
+ }
+
+ private static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path dataRoot, boolean v2) throws IOException {
+ if(dataRoot != null) {
+ conf.setVar(TajoConf.ConfVars.ROOT_DIR, dataRoot.toString());
+ }
+
+ URI uri;
+ if(dataRoot == null) {
+ uri = FileSystem.get(conf).getUri();
+ } else {
+ uri = dataRoot.toUri();
+ }
+ String key = "file".equals(uri.getScheme()) ? "file" : uri.getScheme() + uri.getHost() + uri.getPort();
+
+ if(v2) {
+ key += "_v2";
+ }
+
+ if(storageManagers.containsKey(key)) {
+ return storageManagers.get(key);
+ } else {
+ AbstractStorageManager storageManager = null;
+
+ if(v2) {
+ storageManager = new StorageManagerV2(conf);
+ } else {
+ storageManager = new StorageManager(conf);
+ }
+
+ storageManagers.put(key, storageManager);
+
+ return storageManager;
+ }
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Fragment fragment, Schema schema) throws IOException {
+ return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, fragment, schema);
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Path path) throws IOException {
+
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+
+ return getSeekableScanner(conf, meta, fragment, fragment.getSchema());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
new file mode 100644
index 0000000..f34fa84
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.SeekableScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.json.StorageGsonHelper;
+
+public class CSVFileScanner extends FileScannerV2 {
+ public static final String DELIMITER = "csvfile.delimiter";
+ public static final String DELIMITER_DEFAULT = "|";
+ public static final byte LF = '\n';
+ private static final Log LOG = LogFactory.getLog(CSVFileScanner.class);
+
+ private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
+ private int bufSize;
+ private char delimiter;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private Seekable filePosition;
+ private boolean splittable = true;
+ private long startOffset, length;
+ private byte[] buf = null;
+ private String[] tuples = null;
+ private long[] tupleOffsets = null;
+ private int currentIdx = 0, validIdx = 0;
+ private byte[] tail = null;
+ private long pageStart = -1;
+ private long prevTailLen = -1;
+ private int[] targetColumnIndexes;
+ private boolean eof = false;
+
+ public CSVFileScanner(Configuration conf, final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ super(conf, meta, fragment);
+ factory = new CompressionCodecFactory(conf);
+ codec = factory.getCodec(fragment.getPath());
+ if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
+ splittable = false;
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ // Buffer size, Delimiter
+ this.bufSize = DEFAULT_BUFFER_SIZE;
+ String delim = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+ this.delimiter = delim.charAt(0);
+
+ super.init();
+ }
+
+ @Override
+ protected void initFirstScan() throws IOException {
+ if(!firstSchdeuled) {
+ return;
+ }
+ firstSchdeuled = false;
+
+ // Fragment information
+ fis = fs.open(fragment.getPath(), 128 * 1024);
+ startOffset = fragment.getStartOffset();
+ length = fragment.getLength();
+
+ if (startOffset > 0) {
+ startOffset--; // prev line feed
+ }
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fis, decompressor, startOffset, startOffset + length,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ startOffset = cIn.getAdjustedStart();
+ length = cIn.getAdjustedEnd() - startOffset;
+ filePosition = cIn;
+ is = cIn;
+ } else {
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ }
+ } else {
+ fis.seek(startOffset);
+ filePosition = fis;
+ is = fis;
+ }
+
+ tuples = new String[0];
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+
+ if (startOffset != 0) {
+ int rbyte;
+ while ((rbyte = is.read()) != LF) {
+ if(rbyte == -1) break;
+ }
+ }
+
+ if (fragmentable() < 1) {
+ close();
+ return;
+ }
+ page();
+ }
+
+ private long fragmentable() throws IOException {
+ return startOffset + length - getFilePosition();
+ }
+
+ @Override
+ protected long getFilePosition() throws IOException {
+ long retVal;
+ if (filePosition != null) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = fis.getPos();
+ }
+ return retVal;
+ }
+
+ private void page() throws IOException {
+ // Index initialization
+ currentIdx = 0;
+
+ // Buffer size set
+ if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
+ bufSize = (int) fragmentable();
+ }
+
+ if (this.tail == null || this.tail.length == 0) {
+ this.pageStart = getFilePosition();
+ this.prevTailLen = 0;
+ } else {
+ this.pageStart = getFilePosition() - this.tail.length;
+ this.prevTailLen = this.tail.length;
+ }
+
+ // Read
+ int rbyte;
+ buf = new byte[bufSize];
+ rbyte = is.read(buf);
+
+ if (rbyte < 0) {
+ eof = true; // EOF
+ return;
+ }
+
+ if (prevTailLen == 0) {
+ tail = new byte[0];
+ tuples = StringUtils.split(new String(buf, 0, rbyte), (char) LF);
+ } else {
+ tuples = StringUtils.split(new String(tail)
+ + new String(buf, 0, rbyte), (char) LF);
+ tail = null;
+ }
+
+ // Check tail
+ if ((char) buf[rbyte - 1] != LF) {
+ if ((fragmentable() < 1 || rbyte != bufSize)) {
+ int cnt = 0;
+ byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
+ // Read bytes
+ while ((temp[cnt] = (byte) is.read()) != LF) {
+ cnt++;
+ }
+
+ // Replace tuple
+ tuples[tuples.length - 1] = tuples[tuples.length - 1]
+ + new String(temp, 0, cnt);
+ validIdx = tuples.length;
+ } else {
+ tail = tuples[tuples.length - 1].getBytes();
+ validIdx = tuples.length - 1;
+ }
+ } else {
+ tail = new byte[0];
+ validIdx = tuples.length;
+ }
+
+ if(!isCompress()) makeTupleOffset();
+ }
+
+ private void makeTupleOffset() {
+ long curTupleOffset = 0;
+ this.tupleOffsets = new long[this.validIdx];
+ for (int i = 0; i < this.validIdx; i++) {
+ this.tupleOffsets[i] = curTupleOffset + this.pageStart;
+ curTupleOffset += this.tuples[i].getBytes().length + 1;// tuple byte
+ // + 1byte
+ // line feed
+ }
+ }
+
+ protected Tuple getNextTuple() throws IOException {
+ try {
+ if (currentIdx == validIdx) {
+ if (isSplittable() && fragmentable() < 1) {
+ close();
+ return null;
+ } else {
+ page();
+ }
+
+ if(eof){
+ close();
+ return null;
+ }
+ }
+
+ long offset = -1;
+ if(!isCompress()){
+ offset = this.tupleOffsets[currentIdx];
+ }
+
+ String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+
+ int targetLen = targets.length;
+
+ VTuple tuple = new VTuple(columnNum);
+ Column field;
+ tuple.setOffset(offset);
+ for (int i = 0; i < targetLen; i++) {
+ field = targets[i];
+ int tid = targetColumnIndexes[i];
+ if (cells.length <= tid) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ String cell = cells[tid].trim();
+
+ if (cell.equals("")) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ switch (field.getDataType().getType()) {
+ case BOOLEAN:
+ tuple.put(tid, DatumFactory.createBool(cell));
+ break;
+ case BIT:
+ tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
+ break;
+ case CHAR:
+ String trimmed = cell.trim();
+ tuple.put(tid, DatumFactory.createChar(trimmed));
+ break;
+ case BLOB:
+ tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
+ break;
+ case INT2:
+ tuple.put(tid, DatumFactory.createInt2(cell));
+ break;
+ case INT4:
+ tuple.put(tid, DatumFactory.createInt4(cell));
+ break;
+ case INT8:
+ tuple.put(tid, DatumFactory.createInt8(cell));
+ break;
+ case FLOAT4:
+ tuple.put(tid, DatumFactory.createFloat4(cell));
+ break;
+ case FLOAT8:
+ tuple.put(tid, DatumFactory.createFloat8(cell));
+ break;
+ case TEXT:
+ tuple.put(tid, DatumFactory.createText(cell));
+ break;
+ case INET4:
+ tuple.put(tid, DatumFactory.createInet4(cell));
+ break;
+ case ARRAY:
+ Datum data = StorageGsonHelper.getInstance().fromJson(cell,
+ Datum.class);
+ tuple.put(tid, data);
+ break;
+ }
+ }
+ }
+ }
+ return tuple;
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ return null;
+ }
+
+ private boolean isCompress() {
+ return codec != null;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ super.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed.get()) {
+ return;
+ }
+ try {
+ is.close();
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ tuples = null;
+ super.close();
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return splittable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
new file mode 100644
index 0000000..7802c91
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskDeviceInfo {
+ private int id;
+ private String name;
+
+ private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
+
+ public DiskDeviceInfo(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return id + "," + name;
+ }
+
+ public void addMountPath(DiskMountInfo diskMountInfo) {
+ mountInfos.add(diskMountInfo);
+ }
+
+ public List<DiskMountInfo> getMountInfos() {
+ return mountInfos;
+ }
+
+ public void setMountInfos(List<DiskMountInfo> mountInfos) {
+ this.mountInfos = mountInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
new file mode 100644
index 0000000..d55a6db
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DiskFileScanScheduler extends Thread {
+ private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
+
+ private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
+
+ Queue<FileScannerV2> fetchingScanners = new LinkedList<FileScannerV2>();
+
+ private int scanConcurrency;
+
+ private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
+
+ private Object requestQueueMonitor = new Object(); // c++ code style
+
+ private StorageManagerV2.StorgaeManagerContext smContext;
+
+ private DiskDeviceInfo diskDeviceInfo;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private long totalScanCount = 0;
+
+ private FetchWaitingThread fetchWaitingThread;
+
+ public DiskFileScanScheduler(
+ StorageManagerV2.StorgaeManagerContext smContext,
+ DiskDeviceInfo diskDeviceInfo) {
+ super("DiskFileScanner:" + diskDeviceInfo);
+ this.smContext = smContext;
+ this.diskDeviceInfo = diskDeviceInfo;
+ initScannerPool();
+ this.fetchWaitingThread = new FetchWaitingThread();
+ this.fetchWaitingThread.start();
+ }
+
+ public int getDiskId() {
+ return diskDeviceInfo.getId();
+ }
+
+ public void run() {
+ synchronized (requestQueueMonitor) {
+ while(!stopped.get()) {
+ if(isAllScannerRunning()) {
+ try {
+ requestQueueMonitor.wait(2000);
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ } else {
+ FileScannerV2 fileScanner = requestQueue.poll();
+ if(fileScanner == null) {
+ try {
+ requestQueueMonitor.wait(2000);
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if(fileScanner.isFetchProcessing()) {
+ fetchingScanners.add(fileScanner);
+ synchronized(fetchingScanners) {
+ fetchingScanners.notifyAll();
+ }
+ } else {
+ numOfRunningScanners.incrementAndGet();
+ FileScanRunner fileScanRunner = new FileScanRunner(
+ DiskFileScanScheduler.this, smContext,
+ fileScanner, requestQueueMonitor,
+ numOfRunningScanners);
+ totalScanCount++;
+ fileScanRunner.start();
+ }
+ }
+ }
+ }
+ }
+
+ protected void requestScanFile(FileScannerV2 fileScannerV2) {
+ synchronized (requestQueueMonitor) {
+ requestQueue.offer(fileScannerV2);
+ requestQueueMonitor.notifyAll();
+ }
+ }
+
+ public class FetchWaitingThread extends Thread {
+ public void run() {
+ while(!stopped.get()) {
+ FileScannerV2 scanner = null;
+ synchronized(fetchingScanners) {
+ scanner = fetchingScanners.poll();
+ if(scanner == null) {
+ try {
+ fetchingScanners.wait();
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ synchronized(requestQueueMonitor) {
+ requestQueue.offer(scanner);
+ requestQueueMonitor.notifyAll();
+ }
+ }
+ }
+ }
+
+ private void initScannerPool() {
+ // TODO finally implements heuristic, currently set with property
+ scanConcurrency = smContext.getConf().getInt("tajo.storage.manager.concurrency.perDisk", 1);
+ }
+
+ public int getTotalQueueSize() {
+ return requestQueue.size();
+ }
+
+ boolean isAllScannerRunning() {
+ return numOfRunningScanners.get() >= scanConcurrency;
+ }
+
+ public long getTotalScanCount() {
+ return totalScanCount;
+ }
+
+ public void stopScan() {
+ stopped.set(true);
+ if (fetchWaitingThread != null) {
+ fetchWaitingThread.interrupt();
+ }
+
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
new file mode 100644
index 0000000..d71154c
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskInfo {
+ private int id;
+ private String partitionName;
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ public DiskInfo(int id, String partitionName) {
+ this.id = id;
+ this.partitionName = partitionName;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public void setPartitionName(String partitionName) {
+ this.partitionName = partitionName;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
new file mode 100644
index 0000000..d9b0dd2
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskMountInfo implements Comparable<DiskMountInfo> {
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ private int deviceId;
+
+ public DiskMountInfo(int deviceId, String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+ public int getDeviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public int compareTo(DiskMountInfo other) {
+ String path1 = mountPath;
+ String path2 = other.mountPath;
+
+ int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
+ int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
+
+ if(path1Depth > path2Depth) {
+ return -1;
+ } else if(path1Depth < path2Depth) {
+ return 1;
+ } else {
+ int path1Length = path1.length();
+ int path2Length = path2.length();
+
+ if(path1Length < path2Length) {
+ return 1;
+ } else if(path1Length > path1Length) {
+ return -1;
+ } else {
+ return path1.compareTo(path2);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
new file mode 100644
index 0000000..2daf0f5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class DiskUtil {
+
+ public enum OSType {
+ OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
+ }
+
+ static private OSType getOSType() {
+ String osName = System.getProperty("os.name");
+ if (osName.contains("Windows")
+ && (osName.contains("XP") || osName.contains("2003")
+ || osName.contains("Vista")
+ || osName.contains("Windows_7")
+ || osName.contains("Windows 7") || osName
+ .contains("Windows7"))) {
+ return OSType.OS_TYPE_WINXP;
+ } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+ return OSType.OS_TYPE_SOLARIS;
+ } else if (osName.contains("Mac")) {
+ return OSType.OS_TYPE_MAC;
+ } else {
+ return OSType.OS_TYPE_UNIX;
+ }
+ }
+
+ public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
+ List<DiskDeviceInfo> deviceInfos;
+
+ if(getOSType() == OSType.OS_TYPE_UNIX) {
+ deviceInfos = getUnixDiskDeviceInfos();
+ setDeviceMountInfo(deviceInfos);
+ } else {
+ deviceInfos = getDefaultDiskDeviceInfos();
+ }
+
+ return deviceInfos;
+ }
+
+ private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ File file = new File("/proc/partitions");
+ if(!file.exists()) {
+ System.out.println("No partition file:" + file.getAbsolutePath());
+ return getDefaultDiskDeviceInfos();
+ }
+
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream("/proc/partitions")));
+ String line = null;
+
+ int count = 0;
+ Set<String> deviceNames = new TreeSet<String>();
+ while((line = reader.readLine()) != null) {
+ if(count > 0 && !line.trim().isEmpty()) {
+ String[] tokens = line.trim().split(" +");
+ if(tokens.length == 4) {
+ String deviceName = getDiskDeviceName(tokens[3]);
+ deviceNames.add(deviceName);
+ }
+ }
+ count++;
+ }
+
+ int id = 0;
+ for(String eachDeviceName: deviceNames) {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
+ diskDeviceInfo.setName(eachDeviceName);
+
+ //TODO set addtional info
+ // /sys/block/sda/queue
+ infos.add(diskDeviceInfo);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return infos;
+ }
+
+ private static String getDiskDeviceName(String partitionName) {
+ byte[] bytes = partitionName.getBytes();
+
+ byte[] result = new byte[bytes.length];
+ int length = 0;
+ for(int i = 0; i < bytes.length; i++, length++) {
+ if(bytes[i] >= '0' && bytes[i] <= '9') {
+ break;
+ } else {
+ result[i] = bytes[i];
+ }
+ }
+
+ return new String(result, 0, length);
+ }
+
+ private static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
+ diskDeviceInfo.setName("default");
+
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ infos.add(diskDeviceInfo);
+
+ return infos;
+ }
+
+
+ private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
+ Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
+ for(DiskDeviceInfo eachDevice: deviceInfos) {
+ deviceMap.put(eachDevice.getName(), eachDevice);
+ }
+
+ BufferedReader mountOutput = null;
+ try {
+ Process mountProcess = Runtime.getRuntime().exec("mount");
+ mountOutput = new BufferedReader(new InputStreamReader(
+ mountProcess.getInputStream()));
+ while (true) {
+ String line = mountOutput.readLine();
+ if (line == null) {
+ break;
+ }
+
+ int indexStart = line.indexOf(" on /");
+ int indexEnd = line.indexOf(" ", indexStart + 4);
+
+ String deviceName = line.substring(0, indexStart).trim();
+ System.out.println(deviceName);
+ String[] deviceNameTokens = deviceName.split("/");
+ if(deviceNameTokens.length == 3) {
+ if("dev".equals(deviceNameTokens[1])) {
+ String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
+ String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
+
+ DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
+ if(diskDeviceInfo != null) {
+ diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (mountOutput != null) {
+ mountOutput.close();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("/dev/sde1".split("/").length);
+ for(String eachToken: "/dev/sde1".split("/")) {
+ System.out.println(eachToken);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
new file mode 100644
index 0000000..10f12be
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileScanRunner extends Thread {
+ private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
+
+ StorageManagerV2.StorgaeManagerContext smContext;
+ FileScannerV2 fileScanner;
+ Object requestQueueMonitor;
+ AtomicInteger numOfRunningScanners;
+ DiskFileScanScheduler diskFileScanScheduler;
+
+ int maxReadBytes;
+
+ public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler,
+ StorageManagerV2.StorgaeManagerContext smContext,
+ FileScannerV2 fileScanner, Object requestQueueMonitor,
+ AtomicInteger numOfRunningScanners) {
+ super("FileScanRunner:" + fileScanner.getId());
+ this.diskFileScanScheduler = diskFileScanScheduler;
+ this.fileScanner = fileScanner;
+ this.smContext = smContext;
+ this.requestQueueMonitor = requestQueueMonitor;
+ this.numOfRunningScanners = numOfRunningScanners;
+
+ this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
+ }
+
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ boolean fetching = fileScanner.isFetchProcessing();
+ fileScanner.scan(maxReadBytes);
+// if(diskFileScanScheduler.getDiskId() == 1) {
+// LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
+// ",fetching=" + fetching +
+// ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
+// }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ synchronized(requestQueueMonitor) {
+ numOfRunningScanners.decrementAndGet();
+ requestQueueMonitor.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
new file mode 100644
index 0000000..44c48a5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class FileScannerV2 implements Scanner {
+ private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
+
+ protected AtomicBoolean fetchProcessing = new AtomicBoolean(false);
+
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+
+ protected FileSystem fs;
+
+ protected boolean inited = false;
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final Fragment fragment;
+ protected final int columnNum;
+ protected Column[] targets;
+
+ protected StorageManagerV2.StorgaeManagerContext smContext;
+
+ protected boolean firstSchdeuled = true;
+
+ protected Queue<Tuple> tuplePool;
+
+ AtomicInteger tuplePoolMemory = new AtomicInteger();
+
+ protected abstract Tuple getNextTuple() throws IOException;
+
+ protected abstract void initFirstScan() throws IOException;
+
+ protected abstract long getFilePosition() throws IOException;
+
+ public FileScannerV2(final Configuration conf,
+ final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = meta.getSchema();
+ this.fragment = fragment;
+ this.columnNum = this.schema.getColumnNum();
+
+ this.fs = fragment.getPath().getFileSystem(conf);
+
+ tuplePool = new ConcurrentLinkedQueue<Tuple>();
+ }
+
+ public void init() throws IOException {
+ closed.set(false);
+ fetchProcessing.set(false);
+ firstSchdeuled = true;
+ //tuplePoolIndex = 0;
+ if(tuplePool == null) {
+ tuplePool = new ConcurrentLinkedQueue<Tuple>();
+ }
+ tuplePool.clear();
+
+ if(!inited) {
+ smContext.requestFileScan(this);
+ }
+ inited = true;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ close();
+ inited = false;
+
+ init();
+ }
+
+ public String getId() {
+ return fragment.getPath().toString() + ":" + fragment.getStartOffset() + ":" +
+ fragment.getLength() + "_" + System.currentTimeMillis();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ this.targets = targets;
+ }
+
+ public Path getPath() {
+ return fragment.getPath();
+ }
+
+ public int getDiskId() {
+ if(fragment.getDiskIds().length <= 0) {
+ //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartOffset());
+ return -1;
+ } else {
+ return fragment.getDiskIds()[0];
+ }
+ }
+
+ public void setSearchCondition(Object expr) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ }
+
+ public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
+ this.smContext = context;
+ }
+
+ public boolean isFetchProcessing() {
+// return fetchProcessing.get();
+ return tuplePoolMemory.get() > 16 * 1024 * 1024;
+ }
+
+ long lastScanScheduleTime;
+
+ public String toString() {
+ return fragment.getPath() + ":" + fragment.getStartOffset();
+ }
+
+ public void scan(int maxBytesPerSchedule) throws IOException {
+ if(firstSchdeuled) {
+ initFirstScan();
+ firstSchdeuled = false;
+ }
+ long scanStartPos = getFilePosition();
+ int recordCount = 0;
+ while(true) {
+ Tuple tuple = getNextTuple();
+ if(tuple == null) {
+ break;
+ }
+ tuplePoolMemory.addAndGet(tuple.size());
+ tuplePool.offer(tuple);
+ recordCount++;
+ if(recordCount % 1000 == 0) {
+ if(getFilePosition() - scanStartPos >= maxBytesPerSchedule) {
+ break;
+ } else {
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ }
+ }
+ }
+ if(tuplePool != null) {
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ }
+ if(!isClosed()) {
+ smContext.requestFileScan(this);
+ }
+ }
+
+ public void waitScanStart() {
+ //for test
+ synchronized(fetchProcessing) {
+ try {
+ fetchProcessing.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed.get()) {
+ return;
+ }
+ closed.set(true);
+
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ LOG.info(toString() + " closed");
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ public Tuple next() throws IOException {
+ if(isClosed() && tuplePool == null) {
+ return null;
+ }
+ while(true) {
+ Tuple tuple = tuplePool.poll();
+ if(tuple == null) {
+ if(isClosed()) {
+ tuplePool.clear();
+ tuplePool = null;
+ return null;
+ }
+ synchronized(tuplePool) {
+ try {
+ tuplePool.wait();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ } else {
+ tuplePoolMemory.addAndGet(0 - tuple.size());
+ return tuple;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
new file mode 100644
index 0000000..11c3291
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.rcfile.BytesRefArrayWritable;
+import org.apache.tajo.storage.rcfile.ColumnProjectionUtils;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class RCFileScanner extends FileScannerV2 {
+ private static final Log LOG = LogFactory.getLog(RCFileScanner.class);
+
+ private RCFile.Reader in;
+ private long start;
+ private long end;
+ private boolean more = true;
+ private LongWritable key;
+ private BytesRefArrayWritable column;
+ private Integer [] projectionMap;
+
+ public RCFileScanner(final Configuration conf,
+ final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ super(conf, meta, fragment);
+
+ this.start = fragment.getStartOffset();
+ this.end = start + fragment.getLength();
+ key = new LongWritable();
+ column = new BytesRefArrayWritable();
+ }
+
+ @Override
+ protected Tuple getNextTuple() throws IOException {
+ more = next(key);
+
+ if (more) {
+ column.clear();
+ in.getCurrentRow(column);
+ }
+
+ if(more) {
+ Tuple tuple = makeTuple();
+ return tuple;
+ } else {
+ close();
+ return null;
+ }
+ }
+
+ private Tuple makeTuple() throws IOException {
+ column.resetValid(schema.getColumnNum());
+ Tuple tuple = new VTuple(schema.getColumnNum());
+ int tid; // target column id
+ for (int i = 0; i < projectionMap.length; i++) {
+ tid = projectionMap[i];
+ // if the column is byte[0], it presents a NULL value.
+ if (column.get(tid).getLength() == 0) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ switch (targets[i].getDataType().getType()) {
+ case BOOLEAN:
+ tuple.put(tid,
+ DatumFactory.createBool(column.get(tid).getBytesCopy()[0]));
+ break;
+ case BIT:
+ tuple.put(tid,
+ DatumFactory.createBit(column.get(tid).getBytesCopy()[0]));
+ break;
+ case CHAR:
+ byte[] buf = column.get(tid).getBytesCopy();
+ tuple.put(tid,
+ DatumFactory.createChar(buf));
+ break;
+
+ case INT2:
+ tuple.put(tid,
+ DatumFactory.createInt2(Bytes.toShort(
+ column.get(tid).getBytesCopy())));
+ break;
+ case INT4:
+ tuple.put(tid,
+ DatumFactory.createInt4(Bytes.toInt(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case INT8:
+ tuple.put(tid,
+ DatumFactory.createInt8(Bytes.toLong(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case FLOAT4:
+ tuple.put(tid,
+ DatumFactory.createFloat4(Bytes.toFloat(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case FLOAT8:
+ tuple.put(tid,
+ DatumFactory.createFloat8(Bytes.toDouble(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case INET4:
+ tuple.put(tid,
+ DatumFactory.createInet4(column.get(tid).getBytesCopy()));
+ break;
+
+ case TEXT:
+ tuple.put(tid,
+ DatumFactory.createText(
+ column.get(tid).getBytesCopy()));
+ break;
+
+ case BLOB:
+ tuple.put(tid,
+ DatumFactory.createBlob(column.get(tid).getBytesCopy()));
+ break;
+
+ default:
+ throw new IOException("Unsupport data type");
+ }
+ }
+ }
+
+ return tuple;
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ prepareProjection(targets);
+
+ super.init();
+ }
+
+ private void prepareProjection(Column[] targets) {
+ projectionMap = new Integer[targets.length];
+ int tid;
+ for (int i = 0; i < targets.length; i++) {
+ tid = schema.getColumnIdByName(targets[i].getColumnName());
+ projectionMap[i] = tid;
+ }
+ ArrayList<Integer> projectionIdList = new ArrayList<Integer>(TUtil.newList(projectionMap));
+ ColumnProjectionUtils.setReadColumnIDs(conf, projectionIdList);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if(in != null) {
+ in.close();
+ in = null;
+ }
+ } catch (Exception e) {
+ }
+
+ if(column != null) {
+ column.clear();
+ column = null;
+ }
+ super.close();
+ }
+
+ private boolean next(LongWritable key) throws IOException {
+ if (!more) {
+ return false;
+ }
+
+ more = in.next(key);
+ if (!more) {
+ return false;
+ }
+
+ long lastSeenSyncPos = in.lastSeenSyncPos();
+ if (lastSeenSyncPos >= end) {
+ more = false;
+ return more;
+ }
+ return more;
+ }
+
+ @Override
+ protected void initFirstScan() throws IOException {
+ if(!firstSchdeuled) {
+ return;
+ }
+ this.in = new RCFile.Reader(fs, fragment.getPath(), conf);
+
+ if (start > in.getPosition()) {
+ in.sync(start); // sync to start
+ }
+ this.start = in.getPosition();
+ more = start < end;
+ firstSchdeuled = false;
+ }
+
+ @Override
+ protected long getFilePosition() throws IOException {
+ return in.getPosition();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ //in.seek(0);
+ super.reset();
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
new file mode 100644
index 0000000..eca590f
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
+
+public class ScanScheduler extends Thread {
+ private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
+
+ private Object scanQueueLock;
+ private StorgaeManagerContext context;
+
+ private Map<String, FileScannerV2> requestMap = new HashMap<String, FileScannerV2>();
+
+ private Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
+
+ private Map<Integer, DiskDeviceInfo> diskDeviceInfoMap = new HashMap<Integer, DiskDeviceInfo>();
+
+ private SortedSet<DiskMountInfo> diskMountInfos = new TreeSet<DiskMountInfo>();
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private Random rand = new Random(System.currentTimeMillis());
+
+ public ScanScheduler(StorgaeManagerContext context) {
+ this.context = context;
+ this.scanQueueLock = context.getScanQueueLock();
+
+ try {
+ List<DiskDeviceInfo> deviceInfos = DiskUtil.getDiskDeviceInfos();
+ for(DiskDeviceInfo eachInfo: deviceInfos) {
+ LOG.info("Create DiskScanQueue:" + eachInfo.getName());
+ diskDeviceInfoMap.put(eachInfo.getId(), eachInfo);
+
+ diskMountInfos.addAll(eachInfo.getMountInfos());
+ }
+
+ initFileScanners();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void run() {
+ synchronized(scanQueueLock) {
+ while(!stopped.get()) {
+ FileScannerV2 fileScannerV2 = context.getScanQueue().poll();
+ if(fileScannerV2 == null) {
+ try {
+ scanQueueLock.wait();
+ } catch (InterruptedException e) {
+ break;
+ }
+ } else {
+ int diskId = fileScannerV2.getDiskId();
+
+ //LOG.info("Scan Scheduled:" + diskId + "," + fileScannerV2.toString());
+
+ if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
+ diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
+ if(diskId < 0) {
+
+ diskId = findMinQueueDisk();
+ if(diskId < 0) {
+ diskId = rand.nextInt(diskDeviceInfoMap.size());
+ }
+ }
+ }
+
+ synchronized(diskFileScannerMap) {
+ requestMap.put(fileScannerV2.getId(), fileScannerV2);
+ DiskFileScanScheduler diskScheduler = diskFileScannerMap.get(diskId);
+ diskScheduler.requestScanFile(fileScannerV2);
+ }
+ }
+ }
+ }
+ }
+
+ private int findMinQueueDisk() {
+ int minValue = Integer.MAX_VALUE;
+ int minId = -1;
+ synchronized(diskFileScannerMap) {
+ for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+ int queueSize = eachDiskScanner.getTotalQueueSize();
+ if(queueSize <= minValue) {
+ minValue = queueSize;
+ minId = eachDiskScanner.getDiskId();
+ }
+ }
+ }
+
+ return minId;
+ }
+
+ private int findDiskPartitionPath(String fullPath) {
+ for (DiskMountInfo eachMountInfo : diskMountInfos) {
+ if (fullPath.indexOf(eachMountInfo.getMountPath()) == 0) {
+ return eachMountInfo.getDeviceId();
+ }
+ }
+
+ return -1;
+ }
+
+ private void initFileScanners() {
+ for(Integer eachId: diskDeviceInfoMap.keySet()) {
+ DiskFileScanScheduler scanner = new DiskFileScanScheduler(context, diskDeviceInfoMap.get(eachId));
+ scanner.start();
+
+ diskFileScannerMap.put(eachId, scanner);
+ }
+ }
+
+ public void stopScheduler() {
+ stopped.set(true);
+ for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+ eachDiskScanner.stopScan();
+ }
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
new file mode 100644
index 0000000..1ba6048
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class StorageManagerV2 extends AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
+
+ private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
+
+ private Object scanQueueLock = new Object();
+
+ private Object scanDataLock = new Object();
+
+ private ScanScheduler scanScheduler;
+
+ private StorgaeManagerContext context;
+
+ public StorageManagerV2(TajoConf conf) throws IOException {
+ super(conf);
+ context = new StorgaeManagerContext();
+ scanScheduler = new ScanScheduler(context);
+ scanScheduler.start();
+ LOG.info("StorageManager v2 started...");
+ }
+
+ @Override
+ public Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException {
+ Scanner scanner;
+
+ Class<? extends Scanner> scannerClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ String handlerNameKey = handlerName + "_v2";
+
+ scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
+ if (scannerClass == null) {
+ scannerClass = conf.getClass(
+ String.format("tajo.storage.scanner-handler.v2.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ Scanner.class);
+ SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
+ }
+
+ if (scannerClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ scanner = newScannerInstance(scannerClass, conf, meta, fragment);
+ if (scanner.isProjectable()) {
+ scanner.setTarget(target.toArray());
+ }
+
+ if(scanner instanceof FileScannerV2) {
+ ((FileScannerV2)scanner).setStorageManagerContext(context);
+ }
+ return scanner;
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ synchronized(scanQueueLock) {
+ scanQueue.offer(fileScanner);
+
+ scanQueueLock.notifyAll();
+ }
+ }
+
+ public StorgaeManagerContext getContext() {
+ return context;
+ }
+
+ public class StorgaeManagerContext {
+ public Object getScanQueueLock() {
+ return scanQueueLock;
+ }
+
+ public Object getScanDataLock() {
+ return scanDataLock;
+ }
+
+ public Queue<FileScannerV2> getScanQueue() {
+ return scanQueue;
+ }
+
+ public int getMaxReadBytesPerScheduleSlot() {
+ return conf.getInt("tajo.storage.manager.maxReadBytes", 8 * 1024 * 1024); //8MB
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ StorageManagerV2.this.requestFileScan(fileScanner);
+ }
+
+ public TajoConf getConf() {
+ return conf;
+ }
+ }
+
+ public void stop() {
+ if(scanScheduler != null) {
+ scanScheduler.stopScheduler();
+ }
+ }
+}
[3/4] TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
new file mode 100644
index 0000000..628e7bc
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<style type="text/css" >
+
+ h5 {}
+ h1 {
+ font-size:25pt;
+ font-weight:bold;
+ }
+ h3 {
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left: 4px;
+ font-size:15pt;
+ font-weight:bold;
+ }
+ h2 {
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left:4px;
+ font-size:18pt;
+ font-weight:bold;
+ }
+ h2.line {
+ margin-top:45px;
+ margin-bottom:15px;
+ margin-left:4px;
+ color:#333333;
+ font-size:18pt;
+ font-weight:bold;
+ border-bottom:1px solid #999999;
+ }
+ h2.compactline{
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left:4px;
+ margin-right:4px;
+ color:#333333;
+ font-size:18pt;
+ font-weight:bold;
+ border-bottom:1px solid #999999;
+ }
+ td {
+ border: 1px solid #999999;
+ padding-left : 15px;
+ padding-top:2px;
+ padding-bottom:2px;
+ margin : 0px;
+ word-wrap: break-word;
+ }
+ td.long {
+ width:450px;
+ border: 1px solid #999999;
+ padding : 2px;
+ margin : 0px;
+ text-align:center;
+ }
+ th {
+ border: 1px solid #777777;
+ font-weight:bold;
+ color:#333333;
+ background-color:#cccccc;
+ text-align:left;
+ padding-left:15px;
+ padding-top:3px;
+ padding-bottom:3px;
+ }
+ th.small {
+ font-weight:bold;
+ width:100px;
+ }
+ table {
+ border-collapse:collapse;
+ }
+ table.new {
+ border-collapse:collapse;
+ width:95%;
+ border:1px solid #999999;
+ padding:5px;
+ table-layout:fixed;
+ }
+ table.noborder {
+ border-collapse:collapse;
+ width:98%;
+ border:none;
+ margin-top:5px;
+ }
+ td.rightborder{
+ padding-left:0px;
+ border-right:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ border-bottom:none;
+ text-align:center;
+ }
+ td.noborder{
+ padding-left:0px;
+ border:none;
+ text-align:center;
+ }
+ th.rightbottom{
+ padding-left:0px;
+ border-right:1px solid #cccccc;
+ border-bottom:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ background-color:#ffffff;
+ text-align:center;
+ }
+ th.bottom{
+ padding-left:0px;
+ border-right:none;
+ border-bottom:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ background-color:#ffffff;
+ text-align:center;
+ }
+ iframe {
+ width:1024px;
+ overflow:hidden;
+ border:0px;
+ padding:0px;
+ }
+ div.tajoinfo {
+ width :350px;
+ height:45px;
+ border:1px solid black;
+ margin-top:80px;
+ margin-bottom:2px;
+ margin-left:80px;
+ margin-right:2px;
+ float:left;
+ }
+ div.tajoimage {
+ width:450px;
+ height:125px;
+ margin:3px;
+ float:left;
+ }
+ div.container {
+ width:860px;
+ margin:auto;
+ overflow:auto;
+ }
+ .topcontainer {
+ border:1px solid green;
+ margin:auto;
+ min-height:400px;
+ overflow:auto;
+ }
+ div.leftbox {
+ width:450px;
+ margin:6px;
+ padding:5px;
+ float:left;
+ }
+ div.bottombox {
+ margin:5px;
+ padding:5px;
+ }
+ div.leftcontent {
+ border:1px solid black;
+ margin:auto;
+ min-height:400px;
+ }
+ div.titlebox {
+ width:inherit;
+ border:2px solid #999999;
+ }
+ div.contentbox {
+ padding-top:5px;
+ padding-bottom:5px;
+ width:inherit;
+ border-left:2px solid #999999;
+ border-right:2px solid #999999;
+ border-bottom:1px solid #999999;
+ border-top:1px solid #999999;
+ overflow:hidden;
+ word-wrap:break-word;
+ }
+ div#tajotitle {
+ height:40px;
+ border:1px solid black;
+ margin-top:5px;
+ margin-bottom:5px;
+ }
+ div.jobbox {
+ width:300px;
+ margin:6px;
+ padding:5px;
+ float:left;
+ }
+ div#bottomtabletitle {
+ width:700px;
+ border:2px solid #999999;
+ }
+ div.center {
+ margin-top:10px;
+ text-align:center;
+ }
+ div.headline {
+ background-color:#999999;
+ padding:3px;
+ text-align:center;
+ }
+ div.headline_2 {
+ background-color:#999999;
+ padding:3px;
+ }
+ div.command {
+ margin:auto;
+ width:600px;
+ height:320px;
+ text-align:center;
+ }
+ textarea.command {
+ margin:3px;
+ width:550px;
+ height:310px;
+ }
+ hr {
+ border:1px solid #999999;
+ }
+ a.headline {
+ color:#ffffff;
+ font-size:13pt;
+ font-weight:bold;
+ text-decoration:none;
+ }
+ a.tablelink {
+ color:#000000;
+ text-decoration:none;
+ }
+ a.tablelink:hover {
+ color:#666666;
+ text-decoration:none;
+ }
+ font.maintitle {
+ margin-left:35px;
+ font-size:35pt;
+ font-weight:bold;
+ }
+ font.subtitle {
+ margin-left:35px;
+ font-size:25pt;
+ font-weight:bold;
+ }
+ ul li.off {
+ color:#ee4444;
+ }
+ ul li.on {
+ color:#33dd33;
+ }
+ div.outbox {
+ margin-top:5px;
+ padding-bottom:5px;
+ border:1px solid #cccccc;
+ }
+ div.outbox_order {
+ margin-top:7px;
+ padding-bottom:5px;
+ border:1px solid #cccccc;
+ width:425px;
+ height:auto;
+ float:left;
+ }
+
+ </style>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 5a85a07..d1efc1c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,21 +21,19 @@
*/
package org.apache.tajo;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.FileUtil;
import java.io.IOException;
-import java.util.UUID;
public class BackendTestingUtil {
public final static Schema mockupSchema;
@@ -51,7 +49,7 @@ public class BackendTestingUtil {
public static void writeTmpTable(TajoConf conf, Path path,
String tableName, boolean writeMeta)
throws IOException {
- StorageManager sm = StorageManager.get(conf, path);
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, path);
FileSystem fs = sm.getFileSystem();
Appender appender;
@@ -64,7 +62,7 @@ public class BackendTestingUtil {
if (writeMeta) {
FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
}
- appender = StorageManager.getAppender(conf, mockupMeta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(mockupMeta, tablePath);
appender.init();
int deptSize = 10000;
@@ -85,28 +83,6 @@ public class BackendTestingUtil {
writeTmpTable(conf, new Path(parent), tableName, writeMeta);
}
- private TajoConf conf;
- private CatalogService catalog;
- private SQLAnalyzer analyzer;
- private LogicalPlanner planner;
- private LogicalOptimizer optimizer;
-
public BackendTestingUtil(TajoConf conf) throws IOException {
- this.conf = conf;
- this.catalog = new LocalCatalogWrapper(conf);
- analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
- }
-
- public static Path createTmpTestDir() throws IOException {
- String randomStr = UUID.randomUUID().toString();
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path dir = new Path("target/test-data", randomStr);
- // Have it cleaned up on exit
- if (fs.exists(dir)) {
- fs.delete(dir, true);
- }
- return dir;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 88029ea..e3b5fe7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,7 +44,6 @@ import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
import java.io.*;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.ResultSet;
@@ -189,6 +188,8 @@ public class TajoTestingCluster {
// Do old style too just to be safe.
this.conf.set("fs.default.name", defaultFS.getUri().toString());
+ this.conf.set(TajoConf.ConfVars.ROOT_DIR.name(), defaultFS.getUri() + "/tajo");
+
return this.dfsCluster;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index fd492be..84a5c50 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -22,10 +22,6 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.apache.tajo.BackendTestingUtil;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
@@ -33,6 +29,10 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Set;
@@ -41,7 +41,7 @@ import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class TestTajoClient {
- private static TajoTestingCluster util;
+ private static TajoTestingCluster cluster;
private static TajoConf conf;
private static TajoClient tajo;
private static String TEST_PATH = "target/test-data/"
@@ -50,9 +50,9 @@ public class TestTajoClient {
@BeforeClass
public static void setUp() throws Exception {
- util = new TajoTestingCluster();
- util.startMiniCluster(1);
- conf = util.getConfiguration();
+ cluster = new TajoTestingCluster();
+ cluster.startMiniCluster(1);
+ conf = cluster.getConfiguration();
Thread.sleep(3000);
tajo = new TajoClient(conf);
@@ -61,7 +61,7 @@ public class TestTajoClient {
@AfterClass
public static void tearDown() throws Exception {
- util.shutdownMiniCluster();
+ cluster.shutdownMiniCluster();
if(tajo != null) {
tajo.close();
}
@@ -114,7 +114,7 @@ public class TestTajoClient {
@Test
public final void testCreateAndDropExternalTableByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropExternalTableByExecuteQuery";
BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
@@ -135,7 +135,7 @@ public class TestTajoClient {
@Test
public final void testCreateAndDropTableByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTableByExecuteQuery";
assertFalse(tajo.existTable(tableName));
@@ -145,8 +145,8 @@ public class TestTajoClient {
tajo.updateQuery(tql);
assertTrue(tajo.existTable(tableName));
- FileSystem hdfs = FileSystem.get(conf);
Path tablePath = tajo.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
tajo.updateQuery("drop table " + tableName);
@@ -156,7 +156,7 @@ public class TestTajoClient {
@Test
public final void testDDLByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testDDLByExecuteQuery";
BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index ede73c5..66060ce 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -66,7 +66,7 @@ public class TestGlobalQueryPlanner {
private static LogicalPlanner logicalPlanner;
private static LogicalOptimizer optimizer;
private static QueryId queryId;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
@BeforeClass
public static void setup() throws Exception {
@@ -89,7 +89,7 @@ public class TestGlobalQueryPlanner {
catalog.registerFunction(funcDesc);
}
- sm = new StorageManager(util.getConfiguration());
+ sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -100,7 +100,7 @@ public class TestGlobalQueryPlanner {
dispatcher.init(conf);
dispatcher.start();
- planner = new GlobalPlanner(conf, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, sm,
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
@@ -124,7 +124,7 @@ public class TestGlobalQueryPlanner {
fs.delete(tablePath.getParent(), true);
}
fs.mkdirs(tablePath.getParent());
- appender = StorageManager.getAppender(conf, meta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
tupleNum = 100;
for (j = 0; j < tupleNum; j++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index c665b44..7572ad5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -79,7 +79,7 @@ public class TestGlobalQueryOptimizer {
conf = new TajoConf(util.getConfiguration());
catalog = util.getMiniCatalogCluster().getCatalog();
- StorageManager sm = new StorageManager(util.getConfiguration());
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -88,7 +88,7 @@ public class TestGlobalQueryOptimizer {
AsyncDispatcher dispatcher = new AsyncDispatcher();
- planner = new GlobalPlanner(conf, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, sm,
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
@@ -112,7 +112,7 @@ public class TestGlobalQueryOptimizer {
fs.delete(tablePath.getParent(), true);
}
fs.mkdirs(tablePath.getParent());
- appender = StorageManager.getAppender(conf, meta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
tupleNum = 100;
for (j = 0; j < tupleNum; j++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 97459f9..b714981 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -54,7 +54,7 @@ public class TestBNLJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private static int OUTER_TUPLE_NUM = 1000;
@@ -69,7 +69,7 @@ public class TestBNLJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestBNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -100,7 +100,7 @@ public class TestBNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 1c651f6..8021882 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -59,7 +59,7 @@ public class TestBSTIndexExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Schema idxSchema;
private TupleComparator comp;
private BSTIndex.BSTIndexWriter writer;
@@ -82,7 +82,7 @@ public class TestBSTIndexExec {
catalog = util.getMiniCatalogCluster().getCatalog();
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
- sm = StorageManager.get(conf, workDir);
+ sm = StorageManagerFactory.getStorageManager(conf, workDir);
idxPath = new Path(workDir, "test.idx");
@@ -108,7 +108,7 @@ public class TestBSTIndexExec {
fs = tablePath.getFileSystem(conf);
fs.mkdirs(tablePath.getParent());
- FileAppender appender = (FileAppender)StorageManager.getAppender(conf, meta, tablePath);
+ FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
for (int i = 0; i < 10000; i++) {
@@ -150,7 +150,9 @@ public class TestBSTIndexExec {
@Test
public void testEqual() throws Exception {
-
+ if(conf.getBoolean("tajo.storage.manager.v2", false)) {
+ return;
+ }
this.rndKey = rnd.nextInt(250);
final String QUERY = "select * from employee where managerId = " + rndKey;
@@ -180,7 +182,7 @@ public class TestBSTIndexExec {
}
private class TmpPlanner extends PhysicalPlannerImpl {
- public TmpPlanner(TajoConf conf, StorageManager sm) {
+ public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
super(conf, sm);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 0fc3773..01fd370 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -51,7 +51,7 @@ public class TestExternalSortExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
@@ -67,7 +67,7 @@ public class TestExternalSortExec {
util = new TajoTestingCluster();
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.enableStats();
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -138,6 +138,7 @@ public class TestExternalSortExec {
int cnt = 0;
exec.init();
long start = System.currentTimeMillis();
+
while ((tuple = exec.next()) != null) {
curVal = tuple.get(0);
if (preVal != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 8d80d9e..886dddc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashAntiJoinExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashAntiJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashAntiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -100,7 +100,7 @@ public class TestHashAntiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index e270df3..cf89cf8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -53,7 +53,7 @@ public class TestHashJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -66,7 +66,7 @@ public class TestHashJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -77,7 +77,7 @@ public class TestHashJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 10; i++) {
@@ -99,7 +99,7 @@ public class TestHashJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 317c1f2..d986a8f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashSemiJoinExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashSemiJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashSemiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -100,7 +100,7 @@ public class TestHashSemiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
// make 27 tuples
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 776882b..e77a734 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -55,7 +55,7 @@ public class TestMergeJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private TableDesc employee;
private TableDesc people;
@@ -68,7 +68,7 @@ public class TestMergeJoinExec {
Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
FileSystem fs = testDir.getFileSystem(conf);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestMergeJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 10; i++) {
@@ -108,7 +108,7 @@ public class TestMergeJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 9289dc9..2d82f6c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -53,7 +53,7 @@ public class TestNLJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -65,7 +65,7 @@ public class TestNLJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 50; i++) {
@@ -99,7 +99,7 @@ public class TestNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 50; i += 2) {
@@ -151,7 +151,8 @@ public class TestNLJoinExec {
int i = 0;
exec.init();
- while (exec.next() != null) {
+ Tuple tuple = null;
+ while ( (tuple = exec.next()) != null) {
i++;
}
exec.close();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 75e3b1e..5358d3a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -69,7 +69,7 @@ public class TestPhysicalPlanner {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
private static Path testDir;
private static TableDesc employee = null;
@@ -82,7 +82,7 @@ public class TestPhysicalPlanner {
util.startCatalogCluster();
conf = util.getConfiguration();
testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
catalog.registerFunction(funcDesc);
@@ -107,7 +107,7 @@ public class TestPhysicalPlanner {
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 100; i++) {
@@ -123,7 +123,7 @@ public class TestPhysicalPlanner {
Path scorePath = new Path(testDir, "score");
TableMeta scoreMeta = CatalogUtil.newTableMeta(scoreSchema, StoreType.CSV, new Options());
- appender = StorageManager.getAppender(conf, scoreMeta, scorePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scorePath);
appender.init();
score = new TableDescImpl("score", scoreMeta, scorePath);
tuple = new VTuple(score.getMeta().getSchema().getColumnNum());
@@ -189,7 +189,7 @@ public class TestPhysicalPlanner {
optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
Tuple tuple;
@@ -372,7 +372,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
scanner.init();
Tuple tuple;
int i = 0;
@@ -412,7 +412,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
scanner.init();
Tuple tuple;
int i = 0;
@@ -787,8 +787,8 @@ public class TestPhysicalPlanner {
reader.open();
Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
TableMeta meta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV, new Options());
- SeekableScanner scanner = (SeekableScanner)
- StorageManager.getScanner(conf, meta, outputPath);
+ SeekableScanner scanner =
+ StorageManagerFactory.getSeekableScanner(conf, meta, outputPath);
scanner.init();
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 0151cb3..06c5bb7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -50,7 +50,7 @@ public class TestSortExec {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
private static TajoTestingCluster util;
private static Path workDir;
private static Path tablePath;
@@ -64,7 +64,7 @@ public class TestSortExec {
util = new TajoTestingCluster();
catalog = util.startCatalogCluster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = StorageManager.get(conf, workDir);
+ sm = StorageManagerFactory.getStorageManager(conf, workDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestSortExec {
tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
sm.getFileSystem().mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getAppender(conf, employeeMeta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, tablePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index ba7d36b..d006679 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -22,22 +22,19 @@
package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.sql.ResultSetMetaData;
@@ -49,8 +46,8 @@ import static org.junit.Assert.*;
public class TestResultSetImpl {
private static TajoTestingCluster util;
private static TajoConf conf;
- private static StorageManager sm;
private static TableDesc desc;
+ private static AbstractStorageManager sm;
private static TableMeta scoreMeta;
@BeforeClass
@@ -58,7 +55,7 @@ public class TestResultSetImpl {
util = new TajoTestingCluster();
util.startMiniCluster(3);
conf = util.getConfiguration();
- sm = new StorageManager(conf);
+ sm = StorageManagerFactory.getStorageManager(conf);
Schema scoreSchema = new Schema();
scoreSchema.addColumn("deptname", Type.TEXT);
@@ -68,7 +65,7 @@ public class TestResultSetImpl {
Path p = sm.getTablePath("score");
sm.getFileSystem().mkdirs(p);
- Appender appender = StorageManager.getAppender(conf, scoreMeta, new Path(p, "score"));
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, new Path(p, "score"));
appender.init();
int deptSize = 100;
int tupleNum = 10000;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index a67dd26..b5ce437 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -33,7 +33,8 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -68,7 +69,7 @@ public class TestExecutionBlockCursor {
logicalPlanner = new LogicalPlanner(catalog);
optimizer = new LogicalOptimizer();
- StorageManager sm = new StorageManager(conf);
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index e44ca99..c070c4d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -19,13 +19,9 @@
package org.apache.tajo.storage;
import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
@@ -35,10 +31,14 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.util.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
import java.util.Set;
@@ -46,20 +46,20 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
public class TestRowFile {
- private TajoTestingCluster util;
- private Configuration conf;
+ private TajoTestingCluster cluster;
+ private TajoConf conf;
@Before
public void setup() throws Exception {
- util = new TajoTestingCluster();
- conf = util.getConfiguration();
+ cluster = new TajoTestingCluster();
+ conf = cluster.getConfiguration();
conf.setInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
- util.startMiniDFSCluster(1);
+ cluster.startMiniDFSCluster(1);
}
@After
public void teardown() throws Exception {
- util.shutdownMiniDFSCluster();
+ cluster.shutdownMiniDFSCluster();
}
@Test
@@ -71,15 +71,18 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.ROWFILE);
- Path tablePath = new Path("hdfs:///test");
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf,
+ new Path(conf.get(TajoConf.ConfVars.ROOT_DIR.name())));
+
+ Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
Path dataPath = new Path(tablePath, "test.tbl");
- FileSystem fs = tablePath.getFileSystem(conf);
+ FileSystem fs = sm.getFileSystem();
fs.mkdirs(tablePath);
- FileUtil.writeProto(util.getDefaultFileSystem(), metaPath, meta.getProto());
+ FileUtil.writeProto(fs, metaPath, meta.getProto());
- Appender appender = StorageManager.getAppender(conf, meta, dataPath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, dataPath);
appender.enableStats();
appender.init();
@@ -96,7 +99,6 @@ public class TestRowFile {
tuple.put(2, stringDatum);
appender.addTuple(tuple);
idSet.add(i+1);
-// System.out.println(tuple.toString());
}
long end = System.currentTimeMillis();
@@ -105,21 +107,20 @@ public class TestRowFile {
TableStat stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
- System.out.println("append time: " + (end-start));
+ System.out.println("append time: " + (end - start));
FileStatus file = fs.getFileStatus(dataPath);
TableProto proto = (TableProto) FileUtil.loadProto(
- util.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
+ cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
meta = new TableMetaImpl(proto);
Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen());
int tupleCnt = 0;
start = System.currentTimeMillis();
- Scanner scanner = StorageManager.getScanner(conf, meta, fragment);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
scanner.init();
while ((tuple=scanner.next()) != null) {
tupleCnt++;
-// System.out.println(tuple.toString());
}
scanner.close();
end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index cf1e9ae..4f2795b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -64,7 +64,7 @@ public class TestRangeRetrieverHandler {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Schema schema;
private static int TEST_TUPLE = 10000;
private FileSystem fs;
@@ -78,7 +78,7 @@ public class TestRangeRetrieverHandler {
fs = testDir.getFileSystem(conf);
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
@@ -108,7 +108,7 @@ public class TestRangeRetrieverHandler {
Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
fs.mkdirs(tableDir.getParent());
- Appender appender = sm.getAppender(conf, employeeMeta, tableDir);
+ Appender appender = sm.getAppender(employeeMeta, tableDir);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -161,8 +161,10 @@ public class TestRangeRetrieverHandler {
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = (SeekableScanner)
- sm.getScanner(conf, employeeMeta, StorageUtil.concatPath(testDir, "output", "output"));
+
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta,
+ StorageUtil.concatPath(testDir, "output", "output"));
+
scanner.init();
int cnt = 0;
while(scanner.next() != null) {
@@ -220,7 +222,7 @@ public class TestRangeRetrieverHandler {
TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = sm.getAppender(conf, meta, tablePath);
+ Appender appender = sm.getAppender(meta, tablePath);
appender.init();
Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
@@ -271,8 +273,8 @@ public class TestRangeRetrieverHandler {
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = (SeekableScanner) StorageManager.getScanner(
- conf, meta, StorageUtil.concatPath(testDir, "output", "output"));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta,
+ StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
int cnt = 0;
while(scanner.next() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
new file mode 100644
index 0000000..2b59ecb
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.TableMetaImpl;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
+
+ protected final TajoConf conf;
+ protected final FileSystem fs;
+ protected final Path baseDir;
+ protected final Path tableBaseDir;
+ protected final boolean blocksMetadataEnabled;
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
+
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+ new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ public abstract Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException;
+
+ protected AbstractStorageManager(TajoConf conf) throws IOException {
+ this.conf = conf;
+ this.baseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR));
+ this.tableBaseDir = TajoConf.getWarehousePath(conf);
+ this.fs = baseDir.getFileSystem(conf);
+ this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ if (!this.blocksMetadataEnabled)
+ LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+ }
+
+ public Scanner getScanner(TableMeta meta, Path path)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+ return getScanner(meta, fragment);
+ }
+
+ public Scanner getScanner(TableMeta meta, Fragment fragment)
+ throws IOException {
+ return getScanner(meta, fragment, meta.getSchema());
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public Path getBaseDir() {
+ return this.baseDir;
+ }
+
+ public Path getTableBaseDir() {
+ return this.tableBaseDir;
+ }
+
+ public void delete(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ fs.delete(tablePath, true);
+ }
+
+ public boolean exists(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ return fileSystem.exists(path);
+ }
+
+ /**
+ * This method deletes only data contained in the given path.
+ *
+ * @param path The path in which data are deleted.
+ * @throws IOException
+ */
+ public void deleteData(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ FileStatus[] fileLists = fileSystem.listStatus(path);
+ for (FileStatus status : fileLists) {
+ fileSystem.delete(status.getPath(), true);
+ }
+ }
+
+ public Path getTablePath(String tableName) {
+ return new Path(tableBaseDir, tableName);
+ }
+
+ public Appender getAppender(TableMeta meta, Path path)
+ throws IOException {
+ Appender appender;
+
+ Class<? extends FileAppender> appenderClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+ if (appenderClass == null) {
+ appenderClass = conf.getClass(
+ String.format("tajo.storage.appender-handler.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ FileAppender.class);
+ APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ }
+
+ if (appenderClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ appender = newAppenderInstance(appenderClass, conf, meta, path);
+
+ return appender;
+ }
+
+
+ public TableMeta getTableMeta(Path tablePath) throws IOException {
+ TableMeta meta;
+
+ FileSystem fs = tablePath.getFileSystem(conf);
+ Path tableMetaPath = new Path(tablePath, ".meta");
+ if (!fs.exists(tableMetaPath)) {
+ throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+ }
+
+ FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+ CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
+ CatalogProtos.TableProto.getDefaultInstance());
+ meta = new TableMetaImpl(tableProto);
+
+ return meta;
+ }
+
+ public Fragment[] split(String tableName) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ public Fragment[] split(String tableName, long fragmentSize) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fragmentSize);
+ }
+
+ public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ TableMeta meta = getTableMeta(tablePath);
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
+ listTablets.add(tablet);
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public Fragment[] split(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+ }
+
+ public Fragment[] split(String tableName, Path tablePath) throws IOException {
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ private Fragment[] split(String tableName, Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ TableMeta meta = getTableMeta(tablePath);
+ long defaultBlockSize = size;
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+ } else {
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+ }
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+ Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+ } else {
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+ }
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public void writeTableMeta(Path tableRoot, TableMeta meta)
+ throws IOException {
+ FileSystem fs = tableRoot.getFileSystem(conf);
+ FSDataOutputStream out = fs.create(new Path(tableRoot, ".meta"));
+ FileUtil.writeProto(out, meta.getProto());
+ out.flush();
+ out.close();
+ }
+
+ public long calculateSize(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ long totalSize = 0;
+
+ if (fs.exists(tablePath)) {
+ for (FileStatus status : fs.listStatus(tablePath)) {
+ totalSize += status.getLen();
+ }
+ }
+
+ return totalSize;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // FileInputFormat Area
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * List input directories.
+ * Subclasses may override to, e.g., select only files matching a regular
+ * expression.
+ *
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listStatus(Path path) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ Path[] dirs = new Path[]{path};
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ for (int i = 0; i < dirs.length; ++i) {
+ Path p = dirs[i];
+
+ FileSystem fs = p.getFileSystem(conf);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat : matches) {
+ if (globStat.isDirectory()) {
+ for (FileStatus stat : fs.listStatus(globStat.getPath(),
+ inputFilter)) {
+ result.add(stat);
+ }
+ } else {
+ result.add(globStat);
+ }
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ /**
+ * Get the lower bound on split size imposed by the format.
+ *
+ * @return the number of bytes of the minimal split for this format
+ */
+ protected long getFormatMinSplitSize() {
+ return 1;
+ }
+
+ /**
+ * Is the given filename splitable? Usually, true, but if the file is
+ * stream compressed, it will not be.
+ * <p/>
+ * <code>FileInputFormat</code> implementations can override this and return
+ * <code>false</code> to ensure that individual input files are never split-up
+ * so that Mappers process entire files.
+ *
+ * @param filename the file name to check
+ * @return is this file isSplittable?
+ */
+ protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
+ Scanner scanner = getScanner(meta, filename);
+ return scanner.isSplittable();
+ }
+
+ @Deprecated
+ protected long computeSplitSize(long blockSize, long minSize,
+ long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ @Deprecated
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ @Deprecated
+ protected int getBlockIndex(BlockLocation[] blkLocations,
+ long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) &&
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+ return new Fragment(fragmentId, file, meta, start, length);
+ }
+
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+ int[] diskIds) throws IOException {
+ return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
+ }
+
+ // for Non Splittable. eg, compressed gzip TextFile
+ protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+ BlockLocation[] blkLocations) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+ int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+ hosts[i] = entry.getKey();
+ hostsBlockCount[i] = entry.getValue();
+ }
+ return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
+ }
+
+ /**
+ * Get the maximum split size.
+ *
+ * @return the maximum number of bytes a split can include
+ */
+ @Deprecated
+ public static long getMaxSplitSize() {
+ // TODO - to be configurable
+ return 536870912L;
+ }
+
+ /**
+ * Get the minimum split size
+ *
+ * @return the minimum number of bytes that can be in a split
+ */
+ @Deprecated
+ public static long getMinSplitSize() {
+ // TODO - to be configurable
+ return 67108864L;
+ }
+
+ /**
+ * Get Disk Ids by Volume Bytes
+ */
+ private int[] getDiskIds(VolumeId[] volumeIds) {
+ int[] diskIds = new int[volumeIds.length];
+ for (int i = 0; i < volumeIds.length; i++) {
+ int diskId = -1;
+ if (volumeIds[i] != null && volumeIds[i].isValid()) {
+ String volumeIdString = volumeIds[i].toString();
+ byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+ if (volumeIdBytes.length == 4) {
+ diskId = Bytes.toInt(volumeIdBytes);
+ } else if (volumeIdBytes.length == 1) {
+ diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
+ }
+ }
+ diskIds[i] = diskId;
+ }
+ return diskIds;
+ }
+
+ /**
+ * Generate the map of host and make them into Volume Ids.
+ *
+ */
+ private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+ Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+ for (Fragment frag : frags) {
+ String[] hosts = frag.getHosts();
+ int[] diskIds = frag.getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ Set<Integer> volumeList = volumeMap.get(hosts[i]);
+ if (volumeList == null) {
+ volumeList = new HashSet<Integer>();
+ volumeMap.put(hosts[i], volumeList);
+ }
+
+ if (diskIds.length > 0 && diskIds[i] > -1) {
+ volumeList.add(diskIds[i]);
+ }
+ }
+ }
+
+ return volumeMap;
+ }
+ /**
+ * Generate the list of files and make them into FileSplits.
+ *
+ * @throws IOException
+ */
+ public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
+ // generate splits'
+
+ List<Fragment> splits = new ArrayList<Fragment>();
+ List<FileStatus> files = listStatus(inputPath);
+ FileSystem fs = inputPath.getFileSystem(conf);
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length > 0) {
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ boolean splittable = isSplittable(meta, path);
+ if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+ // supported disk volume
+ BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+ .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+ if (splittable) {
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+ .getVolumeIds())));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
+ }
+
+ } else {
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
+ }
+ }
+ } else {
+ //for zero length files
+ splits.add(makeSplit(tableName, meta, path, 0, length));
+ }
+ }
+
+ LOG.info("Total # of splits: " + splits.size());
+ return splits;
+ }
+
+ private class InvalidInputException extends IOException {
+ public InvalidInputException(
+ List<IOException> errors) {
+ }
+ }
+
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ TableMeta.class,
+ Fragment.class
+ };
+
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ TableMeta.class,
+ Path.class
+ };
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+ Path path) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, meta, path});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
index 4f6dde1..6c31247 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
@@ -23,15 +23,17 @@ import com.google.gson.Gson;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.json.GsonObject;
import org.apache.tajo.storage.json.StorageGsonHelper;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject, GsonObject {
protected FragmentProto.Builder builder = null;
@@ -44,8 +46,8 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
@Expose private boolean distCached = false; // optional
private String[] hosts; // Datanode hostnames
- private int[] hostsBlockCount; // list of block count of hosts
- private int[] diskIds;
+ @Expose private int[] hostsBlockCount; // list of block count of hosts
+ @Expose private int[] diskIds;
public Fragment() {
builder = FragmentProto.newBuilder();
@@ -53,13 +55,13 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException {
this();
- TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ //TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ TableMeta newMeta = meta;
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength());
- this.hosts = blockLocation.getHosts();
- this.diskIds = diskIds;
+ this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength(),
+ blockLocation.getHosts(), diskIds);
}
// Non splittable
@@ -69,7 +71,7 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(tableName, uri, newMeta, start, length);
+ this.set(tableName, uri, newMeta, start, length, null, null);
this.hosts = hosts;
this.hostsBlockCount = hostsBlockCount;
}
@@ -80,26 +82,35 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(fragmentId, path, newMeta, start, length);
+ this.set(fragmentId, path, newMeta, start, length, null, null);
}
public Fragment(FragmentProto proto) {
this();
TableMeta newMeta = new TableMetaImpl(proto.getMeta());
+ int[] diskIds = new int[proto.getDiskIdsList().size()];
+ int i = 0;
+ for(Integer eachValue: proto.getDiskIdsList()) {
+ diskIds[i++] = eachValue;
+ }
this.set(proto.getId(), new Path(proto.getPath()), newMeta,
- proto.getStartOffset(), proto.getLength());
+ proto.getStartOffset(), proto.getLength(),
+ proto.getHostsList().toArray(new String[]{}),
+ diskIds);
if (proto.hasDistCached() && proto.getDistCached()) {
distCached = true;
}
}
private void set(String tableName, Path path, TableMeta meta, long start,
- long length) {
+ long length, String[] hosts, int[] diskIds) {
this.tableName = tableName;
this.uri = path;
this.meta = meta;
this.startOffset = start;
this.length = length;
+ this.hosts = hosts;
+ this.diskIds = diskIds;
}
@@ -234,6 +245,9 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
frag.uri = uri;
frag.meta = (TableMeta) (meta != null ? meta.clone() : null);
frag.distCached = distCached;
+ frag.diskIds = diskIds;
+ frag.hosts = hosts;
+ frag.hostsBlockCount = hostsBlockCount;
return frag;
}
@@ -256,6 +270,17 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
builder.setLength(this.length);
builder.setPath(this.uri.toString());
builder.setDistCached(this.distCached);
+ if(diskIds != null) {
+ List<Integer> idList = new ArrayList<Integer>();
+ for(int eachId: diskIds) {
+ idList.add(eachId);
+ }
+ builder.addAllDiskIds(idList);
+ }
+
+ if(hosts != null) {
+ builder.addAllHosts(TUtil.newList(hosts));
+ }
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index a7a1e4a..582c64f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
import java.io.IOException;
import java.util.ArrayList;
@@ -61,7 +62,7 @@ public class MergeScanner implements Scanner {
currentScanner.close();
}
currentFragment = iterator.next();
- currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+ currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
currentScanner.init();
return currentScanner.next();
} else {
@@ -74,7 +75,7 @@ public class MergeScanner implements Scanner {
iterator = fragments.iterator();
if (iterator.hasNext()) {
currentFragment = iterator.next();
- currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+ currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
}
}
[4/4] git commit: TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
Posted by hy...@apache.org.
TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5ad7fbae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5ad7fbae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5ad7fbae
Branch: refs/heads/master
Commit: 5ad7fbae98e45307cde51bc58d54e4de95b627ad
Parents: 5d3966a
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Sep 13 11:45:21 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Sep 13 11:53:20 2013 +0900
----------------------------------------------------------------------
pom.xml | 1 +
.../src/main/proto/CatalogProtos.proto | 2 +
tajo-core/tajo-core-backend/pom.xml | 1 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 80 ++-
.../engine/planner/PhysicalPlannerImpl.java | 8 +-
.../planner/physical/BSTIndexScanExec.java | 4 +-
.../planner/physical/ExternalSortExec.java | 2 +-
.../planner/physical/IndexedStoreExec.java | 4 +-
.../planner/physical/PartitionedStoreExec.java | 9 +-
.../engine/planner/physical/ProjectionExec.java | 1 +
.../engine/planner/physical/SeqScanExec.java | 6 +-
.../engine/planner/physical/StoreTableExec.java | 13 +-
.../org/apache/tajo/master/GlobalEngine.java | 6 +-
.../org/apache/tajo/master/GlobalPlanner.java | 6 +-
.../java/org/apache/tajo/master/TajoMaster.java | 18 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 21 +-
.../apache/tajo/master/querymaster/Query.java | 13 +-
.../master/querymaster/QueryInProgress.java | 2 +
.../master/querymaster/QueryJobManager.java | 10 +
.../tajo/master/querymaster/QueryMaster.java | 39 +-
.../master/querymaster/QueryMasterTask.java | 4 +-
.../tajo/master/querymaster/QueryUnit.java | 39 +-
.../tajo/master/querymaster/SubQuery.java | 8 +-
.../master/rm/TajoWorkerResourceManager.java | 31 +-
.../apache/tajo/master/rm/WorkerResource.java | 47 ++
.../java/org/apache/tajo/webapp/HttpServer.java | 15 +-
.../org/apache/tajo/worker/TajoQueryEngine.java | 9 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 53 +-
.../tajo/worker/TajoWorkerManagerService.java | 4 +-
.../main/java/org/apache/tajo/worker/Task.java | 1 -
.../apache/tajo/worker/TaskRunnerManager.java | 6 +
.../src/main/proto/TajoMasterProtocol.proto | 11 +
.../resources/webapps/admin/WEB-INF/web.xml | 10 +
.../src/main/resources/webapps/admin/index.jsp | 69 +-
.../src/main/resources/webapps/admin/query.jsp | 38 ++
.../resources/webapps/worker/WEB-INF/web.xml | 10 +
.../src/main/resources/webapps/worker/index.jsp | 60 ++
.../resources/webapps/worker/querydetail.jsp | 60 ++
.../src/main/resources/webapps/worker/style.css | 285 ++++++++
.../org/apache/tajo/BackendTestingUtil.java | 34 +-
.../org/apache/tajo/TajoTestingCluster.java | 3 +-
.../org/apache/tajo/client/TestTajoClient.java | 26 +-
.../plan/global/TestGlobalQueryPlanner.java | 8 +-
.../global/TestGlobalQueryOptimizer.java | 6 +-
.../planner/physical/TestBNLJoinExec.java | 8 +-
.../planner/physical/TestBSTIndexExec.java | 12 +-
.../planner/physical/TestExternalSortExec.java | 7 +-
.../planner/physical/TestHashAntiJoinExec.java | 8 +-
.../planner/physical/TestHashJoinExec.java | 8 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../planner/physical/TestMergeJoinExec.java | 8 +-
.../engine/planner/physical/TestNLJoinExec.java | 11 +-
.../planner/physical/TestPhysicalPlanner.java | 18 +-
.../engine/planner/physical/TestSortExec.java | 6 +-
.../tajo/engine/query/TestResultSetImpl.java | 21 +-
.../tajo/master/TestExecutionBlockCursor.java | 5 +-
.../org/apache/tajo/storage/TestRowFile.java | 39 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 18 +-
.../tajo/storage/AbstractStorageManager.java | 669 +++++++++++++++++++
.../java/org/apache/tajo/storage/Fragment.java | 47 +-
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../org/apache/tajo/storage/StorageManager.java | 658 +-----------------
.../tajo/storage/StorageManagerFactory.java | 96 +++
.../apache/tajo/storage/v2/CSVFileScanner.java | 383 +++++++++++
.../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 ++
.../tajo/storage/v2/DiskFileScanScheduler.java | 168 +++++
.../org/apache/tajo/storage/v2/DiskInfo.java | 75 +++
.../apache/tajo/storage/v2/DiskMountInfo.java | 86 +++
.../org/apache/tajo/storage/v2/DiskUtil.java | 198 ++++++
.../apache/tajo/storage/v2/FileScanRunner.java | 75 +++
.../apache/tajo/storage/v2/FileScannerV2.java | 253 +++++++
.../apache/tajo/storage/v2/RCFileScanner.java | 256 +++++++
.../apache/tajo/storage/v2/ScanScheduler.java | 148 ++++
.../tajo/storage/v2/StorageManagerV2.java | 135 ++++
.../src/main/resources/storage-default.xml | 49 ++
.../tajo/storage/TestCompressionStorages.java | 10 +-
.../apache/tajo/storage/TestMergeScanner.java | 18 +-
.../apache/tajo/storage/TestStorageManager.java | 14 +-
.../org/apache/tajo/storage/TestStorages.java | 14 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 47 +-
.../index/TestSingleCSVFileBSTIndex.java | 10 +-
.../src/test/resources/storage-default.xml | 30 +
82 files changed, 3758 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3d50036..136a5f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,6 +312,7 @@
<exclude>**/*.schema</exclude>
<exclude>**/*.tbl</exclude>
<exclude>**/*.jsp</exclude>
+ <exclude>**/web.xml</exclude>
<!-- generated content -->
<exclude>**/target/**</exclude>
<exclude>**/*.log</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 6164553..3d61ebb 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -88,6 +88,8 @@ message FragmentProto {
required TableProto meta = 5;
optional TableStatProto stat = 6;
optional bool distCached = 7 [default = false];
+ repeated string hosts = 8;
+ repeated int32 diskIds = 9;
}
message TableProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 9c1d458..78b7e6e 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -67,6 +67,7 @@
<systemProperties>
<tajo.test>TRUE</tajo.test>
</systemProperties>
+ <argLine>-Xms512m -Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index c273f6f..ced5e5e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -351,49 +351,55 @@ public class TajoCli {
if (status.getState() == QueryState.QUERY_SUCCEEDED) {
if (status.hasResult()) {
ResultSet res = client.getQueryResult(queryId);
- if (res == null) {
- sout.println("OK");
- return;
- }
-
- ResultSetMetaData rsmd = res.getMetaData();
- TableDesc desc = client.getResultDesc(queryId);
- sout.println("final state: " + status.getState()
- + ", init time: " + (((float)(status.getInitTime() - status.getSubmitTime())
- / 1000.0) + " sec")
- + ", execution time: " + (((float)status.getFinishTime() - status.getInitTime())
- / 1000.0) + " sec"
- + ", total response time: " + (((float)(status.getFinishTime() -
- status.getSubmitTime()) / 1000.0) + " sec"));
- sout.println("result: " + desc.getPath() + "\n");
-
- int numOfColumns = rsmd.getColumnCount();
- for (int i = 1; i <= numOfColumns; i++) {
- if (i > 1) sout.print(", ");
- String columnName = rsmd.getColumnName(i);
- sout.print(columnName);
- }
- sout.println("\n-------------------------------");
+ try {
+ if (res == null) {
+ sout.println("OK");
+ return;
+ }
- int numOfPrintedRows = 0;
- while (res.next()) {
- // TODO - to be improved to print more formatted text
+ ResultSetMetaData rsmd = res.getMetaData();
+ TableDesc desc = client.getResultDesc(queryId);
+ sout.println("final state: " + status.getState()
+ + ", init time: " + (((float)(status.getInitTime() - status.getSubmitTime())
+ / 1000.0) + " sec")
+ + ", execution time: " + (((float)status.getFinishTime() - status.getInitTime())
+ / 1000.0) + " sec"
+ + ", total response time: " + (((float)(status.getFinishTime() -
+ status.getSubmitTime()) / 1000.0) + " sec"));
+ sout.println("result: " + desc.getPath() + "\n");
+
+ int numOfColumns = rsmd.getColumnCount();
for (int i = 1; i <= numOfColumns; i++) {
if (i > 1) sout.print(", ");
- String columnValue = res.getObject(i).toString();
- sout.print(columnValue);
+ String columnName = rsmd.getColumnName(i);
+ sout.print(columnName);
}
- sout.println();
- sout.flush();
- numOfPrintedRows++;
- if (numOfPrintedRows >= PRINT_LIMIT) {
- sout.print("continue... ('q' is quit)");
- sout.flush();
- if (sin.read() == 'q') {
- break;
+ sout.println("\n-------------------------------");
+
+ int numOfPrintedRows = 0;
+ while (res.next()) {
+ // TODO - to be improved to print more formatted text
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sout.print(", ");
+ String columnValue = res.getObject(i).toString();
+ sout.print(columnValue);
}
- numOfPrintedRows = 0;
sout.println();
+ sout.flush();
+ numOfPrintedRows++;
+ if (numOfPrintedRows >= PRINT_LIMIT) {
+ sout.print("continue... ('q' is quit)");
+ sout.flush();
+ if (sin.read() == 'q') {
+ break;
+ }
+ numOfPrintedRows = 0;
+ sout.println();
+ }
+ }
+ } finally {
+ if(res != null) {
+ res.close();
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2d123bd..7d68e19 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -32,8 +32,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
-import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.IndexUtil;
@@ -42,9 +42,9 @@ import java.io.IOException;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
protected final TajoConf conf;
- protected final StorageManager sm;
+ protected final AbstractStorageManager sm;
- public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) {
+ public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
this.sm = sm;
}
@@ -227,7 +227,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new TunnelExec(ctx, plan.getOutSchema(), subOp);
}
- return new StoreTableExec(ctx, sm, plan, subOp);
+ return new StoreTableExec(ctx, plan, subOp);
}
public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index a0d69e0..fe7bd44 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -47,7 +47,7 @@ public class BSTIndexScanExec extends PhysicalExec {
private boolean initialize = true;
public BSTIndexScanExec(TaskAttemptContext context,
- StorageManager sm , ScanNode scanNode ,
+ AbstractStorageManager sm , ScanNode scanNode ,
Fragment fragment, Path fileName , Schema keySchema,
TupleComparator comparator , Datum[] datum) throws IOException {
super(context, scanNode.getInSchema(), scanNode.getOutSchema());
@@ -60,7 +60,7 @@ public class BSTIndexScanExec extends PhysicalExec {
}
this.datum = datum;
- this.fileScanner = (SeekableScanner)StorageManager.getScanner(context.getConf(),
+ this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
fragment.getMeta(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 7ba0d8a..6d49880 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -45,7 +45,7 @@ public class ExternalSortExec extends SortExec {
private int SORT_BUFFER_SIZE;
public ExternalSortExec(final TaskAttemptContext context,
- final StorageManager sm, final SortNode plan, final PhysicalExec child)
+ final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child,
plan.getSortKeys());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index dc7f08c..f68ff59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -44,7 +44,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
private FileAppender appender;
private TableMeta meta;
- public IndexedStoreExec(final TaskAttemptContext context, final StorageManager sm,
+ public IndexedStoreExec(final TaskAttemptContext context, final AbstractStorageManager sm,
final PhysicalExec child, final Schema inSchema, final Schema outSchema,
final SortSpec[] sortSpecs) throws IOException {
super(context, inSchema, outSchema, child);
@@ -71,7 +71,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
.newTableMeta(this.outSchema, CatalogProtos.StoreType.CSV);
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- this.appender = (FileAppender) StorageManager.getAppender(context.getConf(), meta,
+ this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
new Path(storeTablePath, "output"));
this.appender.enableStats();
this.appender.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index f70bf18..a249edc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -33,10 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.*;
import java.io.IOException;
import java.text.NumberFormat;
@@ -64,7 +61,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
private final Path storeTablePath;
private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
- public PartitionedStoreExec(TaskAttemptContext context, final StorageManager sm,
+ public PartitionedStoreExec(TaskAttemptContext context, final AbstractStorageManager sm,
final StoreTableNode plan, final PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
Preconditions.checkArgument(plan.hasPartitionKey());
@@ -101,7 +98,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
FileStatus status = fs.getFileStatus(dataFile);
LOG.info("File size: " + status.getLen());
}
- appender = StorageManager.getAppender(context.getConf(), meta, dataFile);
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, dataFile);
appender.enableStats();
appender.init();
appenderMap.put(partition, appender);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 4aa8a1c..bafeeea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -55,6 +55,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple = child.next();
+
if (tuple == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 474415e..9051219 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -46,7 +46,7 @@ public class SeqScanExec extends PhysicalExec {
private Projector projector;
private EvalContext [] evalContexts;
- public SeqScanExec(TaskAttemptContext context, StorageManager sm,
+ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, Fragment[] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -91,8 +91,8 @@ public class SeqScanExec extends PhysicalExec {
this.scanner = new MergeScanner(context.getConf(), fragments[0].getMeta(),
TUtil.newList(fragments));
} else {
- this.scanner = StorageManager.getScanner(context.getConf(), fragments[0].getMeta(),
- fragments[0], projected);
+ this.scanner = StorageManagerFactory.getStorageManager(
+ context.getConf()).getScanner(fragments[0].getMeta(), fragments[0], projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 7d6a525..a799694 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -28,10 +28,7 @@ import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.*;
import java.io.IOException;
@@ -47,10 +44,8 @@ public class StoreTableExec extends UnaryPhysicalExec {
* @throws java.io.IOException
*
*/
- public StoreTableExec(TaskAttemptContext context, StorageManager sm,
- StoreTableNode plan, PhysicalExec child) throws IOException {
+ public StoreTableExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
-
this.plan = plan;
}
@@ -68,10 +63,10 @@ public class StoreTableExec extends UnaryPhysicalExec {
Path storeTablePath = new Path(context.getWorkDir(), "out");
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- appender = StorageManager.getAppender(context.getConf(), meta,
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
StorageUtil.concatPath(storeTablePath, "0"));
} else {
- appender = StorageManager.getAppender(context.getConf(), meta, context.getOutputPath());
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, context.getOutputPath());
}
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 76f8571..2ddd891 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -49,7 +49,7 @@ import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageUtil;
import java.io.IOException;
@@ -65,7 +65,7 @@ public class GlobalEngine extends AbstractService {
private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
private final MasterContext context;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
private SQLAnalyzer analyzer;
private HiveConverter converter;
@@ -107,7 +107,7 @@ public class GlobalEngine extends AbstractService {
NoSuchQueryIdException, IllegalQueryStatusException,
UnknownWorkerException, EmptyClusterException {
- LOG.info("SQL: " + sql);
+ LOG.info(">>>>>SQL: " + sql);
try {
// setting environment variables
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index d58d4db..7c374d2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -32,7 +32,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -47,11 +47,11 @@ public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
private TajoConf conf;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private QueryId queryId;
public GlobalPlanner(final TajoConf conf,
- final StorageManager sm,
+ final AbstractStorageManager sm,
final EventHandler eventHandler)
throws IOException {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 01f312c..0635156 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -47,7 +47,8 @@ import org.apache.tajo.engine.function.builtin.*;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.master.rm.YarnTajoResourceManager;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.webapp.StaticHttpServer;
@@ -87,7 +88,7 @@ public class TajoMaster extends CompositeService {
private CatalogServer catalogServer;
private CatalogService catalog;
- private StorageManager storeManager;
+ private AbstractStorageManager storeManager;
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
@@ -121,7 +122,7 @@ public class TajoMaster extends CompositeService {
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
- this.storeManager = new StorageManager(systemConf);
+ this.storeManager = StorageManagerFactory.getStorageManager(systemConf);
catalogServer = new CatalogServer(initBuiltinFunctions());
addIfService(catalogServer);
@@ -140,7 +141,7 @@ public class TajoMaster extends CompositeService {
addIfService(tajoMasterService);
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error(e.getMessage(), e);
}
super.init(systemConf);
@@ -157,7 +158,8 @@ public class TajoMaster extends CompositeService {
}
private void initWebServer() throws Exception {
- webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
+ int httpPort = systemConf.getInt("tajo.master.http.port", 8080);
+ webServer = StaticHttpServer.getInstance(this ,"admin", null, httpPort ,
true, null, context.getConf(), null);
webServer.start();
}
@@ -341,7 +343,7 @@ public class TajoMaster extends CompositeService {
} finally {
out.close();
}
- defaultFS.setReplication(systemConfPath, (short)systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
+ defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
@Override
@@ -368,7 +370,7 @@ public class TajoMaster extends CompositeService {
return this.catalog;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return this.storeManager;
}
@@ -407,7 +409,7 @@ public class TajoMaster extends CompositeService {
return globalEngine;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return storeManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 71b0114..574122b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -173,12 +173,12 @@ public class TaskSchedulerImpl extends AbstractService
if (taskRequests.size() > 0) {
if (scheduledRequests.leafTaskNum() > 0) {
- LOG.info("Try to schedule tasks with taskRequestEvents: " +
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
taskRequests.size() + ", LeafTask Schedule Request: " +
scheduledRequests.leafTaskNum());
taskRequests.getTaskRequests(taskRequestEvents,
scheduledRequests.leafTaskNum());
- LOG.info("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
if (taskRequestEvents.size() > 0) {
scheduledRequests.assignToLeafTasks(taskRequestEvents);
taskRequestEvents.clear();
@@ -188,7 +188,7 @@ public class TaskSchedulerImpl extends AbstractService
if (taskRequests.size() > 0) {
if (scheduledRequests.nonLeafTaskNum() > 0) {
- LOG.info("Try to schedule tasks with taskRequestEvents: " +
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
taskRequests.size() + ", NonLeafTask Schedule Request: " +
scheduledRequests.nonLeafTaskNum());
taskRequests.getTaskRequests(taskRequestEvents,
@@ -390,17 +390,18 @@ public class TaskSchedulerImpl extends AbstractService
public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
Iterator<TaskRequestEvent> it = taskRequests.iterator();
TaskRequestEvent taskRequest;
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
- LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + taskRequest.getContainerId());
ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
-
- if(container == null) continue;
-
+ if(container == null) {
+ continue;
+ }
String host = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
@@ -479,8 +480,8 @@ public class TaskSchedulerImpl extends AbstractService
}
}
- LOG.info("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
- LOG.info("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+ LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+ LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
}
public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
@@ -489,7 +490,7 @@ public class TaskSchedulerImpl extends AbstractService
TaskRequestEvent taskRequest;
while (it.hasNext()) {
taskRequest = it.next();
- LOG.info("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+ LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
QueryUnitAttemptId attemptId;
// random allocation
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index c473586..4ba95b0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -39,14 +39,11 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.master.ExecutionBlockCursor;
import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,7 +58,7 @@ public class Query implements EventHandler<QueryEvent> {
private Map<ExecutionBlockId, SubQuery> subqueries;
private final EventHandler eventHandler;
private final MasterPlan plan;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
QueryMasterTask.QueryMasterTaskContext context;
private ExecutionBlockCursor cursor;
@@ -242,6 +239,10 @@ public class Query implements EventHandler<QueryEvent> {
return this.subqueries.get(id);
}
+ public Collection<SubQuery> getSubQueries() {
+ return Collections.unmodifiableCollection(this.subqueries.values());
+ }
+
public QueryState getState() {
readLock.lock();
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 809dce2..c54f8da 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -235,6 +235,8 @@ public class QueryInProgress extends CompositeService {
this.queryInfo.setQueryMasterResource(queryInfo.getQueryMasterResource());
}
this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
this.queryInfo.setLastMessage(queryInfo.getLastMessage());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 424d5bf..669dee3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -32,6 +32,8 @@ import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -85,6 +87,14 @@ public class QueryJobManager extends CompositeService {
return dispatcher.getEventHandler();
}
+ public Collection<QueryInProgress> getRunningQueries() {
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
+
+ public Collection<QueryInProgress> getFinishedQueries() {
+ return Collections.unmodifiableCollection(finishedQueries.values());
+ }
+
public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index d45988c..6611102 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -36,13 +36,11 @@ import org.apache.tajo.master.GlobalPlanner;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.worker.TajoWorker;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
// TODO - when exception, send error status to QueryJobManager
@@ -59,7 +57,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private GlobalOptimizer globalOptimizer;
- private StorageManager storageManager;
+ private AbstractStorageManager storageManager;
private TajoConf systemConf;
@@ -93,7 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
addIfService(dispatcher);
- this.storageManager = new StorageManager(systemConf);
+ this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
globalOptimizer = new GlobalOptimizer();
@@ -185,6 +183,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
return this.queryMasterContext;
}
+ public Collection<QueryMasterTask> getQueryMasterTasks() {
+ return queryMasterTasks.values();
+ }
+
public class QueryMasterContext {
private TajoConf conf;
@@ -204,7 +206,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return clock;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return storageManager;
}
@@ -272,7 +274,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void run() {
LOG.info("Start QueryMaster heartbeat thread");
while(!queryMasterStop.get()) {
- //TODO report all query status
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
synchronized(queryMasterTasks) {
tempTasks.addAll(queryMasterTasks.values());
@@ -285,6 +286,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
.setState(eachTask.getState())
.setQueryId(eachTask.getQueryId().getProto())
+ .setQueryProgress(eachTask.getQuery().getProgress())
+ .setQueryFinishTime(eachTask.getQuery().getFinishTime())
.build();
workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
@@ -318,15 +321,17 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
for(QueryMasterTask eachTask: tempTasks) {
- try {
- long lastHeartbeat = eachTask.getLastClientHeartbeat();
- long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
- LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
- eachTask.expiredSessionTimeout();
+ if(!eachTask.isStopped()) {
+ try {
+ long lastHeartbeat = eachTask.getLastClientHeartbeat();
+ long time = System.currentTimeMillis() - lastHeartbeat;
+ if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+ eachTask.expiredSessionTimeout();
+ }
+ } catch (Exception e) {
+ LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
}
- } catch (Exception e) {
- LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 2212f82..a4fabcf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -44,7 +44,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import org.apache.tajo.worker.YarnResourceAllocator;
@@ -404,7 +404,7 @@ public class QueryMasterTask extends CompositeService {
return queryId;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return queryMasterContext.getStorageManager();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index ec54244..00dcc0b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -75,6 +75,9 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private int failedAttempts;
private int finishedAttempts; // finish are total of success, failed and killed
+ private long launchTime;
+ private long finishTime;
+
private static final StateMachineFactory
<QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
new StateMachineFactory
@@ -84,7 +87,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
+ TaskEventType.T_ATTEMPT_LAUNCHED, new AttemptLaunchedTransition())
.addTransition(TaskState.RUNNING, TaskState.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED)
@@ -186,7 +189,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
this.fragMap.put(fragment.getName(), fragment);
setDataLocations(fragment);
}
-
+
+ public String getSucceededHost() {
+ return succeededHost;
+ }
+
public void addFetch(String tableId, String uri) throws URISyntaxException {
this.addFetch(tableId, new URI(uri));
}
@@ -317,7 +324,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.attempts.get(this.lastAttemptId);
}
- protected QueryUnitAttempt getSuccessfulAttempt() {
+ public QueryUnitAttempt getSuccessfulAttempt() {
readLock.lock();
try {
if (null == successfulAttempt) {
@@ -342,6 +349,22 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getRunningTime() {
+ if(finishTime > 0) {
+ return finishTime - launchTime;
+ } else {
+ return System.currentTimeMillis() - launchTime;
+ }
+ }
+
// This is always called in the Write Lock
private void addAndScheduleAttempt() {
// Create new task attempt
@@ -385,14 +408,24 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
QueryUnitAttempt attempt = task.attempts.get(
attemptEvent.getTaskAttemptId());
+
task.successfulAttempt = attemptEvent.getTaskAttemptId();
task.succeededHost = attempt.getHost();
+ task.finishTime = System.currentTimeMillis();
task.succeededPullServerPort = attempt.getPullServerPort();
task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
SubQueryEventType.SQ_TASK_COMPLETED));
}
}
+ private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+ @Override
+ public void transition(QueryUnit task,
+ TaskEvent event) {
+ task.launchTime = System.currentTimeMillis();
+ }
+ }
+
private static class AttemptFailedTransition implements
MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index ac92386..1bf45ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -48,8 +48,8 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
-import org.apache.tajo.storage.StorageManager;
import java.io.IOException;
import java.util.*;
@@ -73,7 +73,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int priority;
private TableMeta meta;
private EventHandler eventHandler;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
private TaskSchedulerImpl taskScheduler;
private QueryMasterTask.QueryMasterTaskContext context;
@@ -135,7 +135,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int completedTaskCount = 0;
- public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, StorageManager sm) {
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, AbstractStorageManager sm) {
this.context = context;
this.block = block;
this.sm = sm;
@@ -230,7 +230,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return this.priority;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return sm;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 7c4bf59..d1a0c96 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -193,23 +193,25 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
class WorkerResourceAllocationThread extends Thread {
@Override
public void run() {
- LOG.info("====>WorkerResourceAllocationThread start");
+ LOG.info("WorkerResourceAllocationThread start");
while(!stopped.get()) {
try {
WorkerResourceRequest resourceRequest = requestQueue.take();
- LOG.info("====> allocateWorkerResources:" +
- (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
- ", required:" + resourceRequest.request.getNumWorks() +
- ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
- ", liveWorkers=" + liveWorkerResources.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocateWorkerResources:" +
+ (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
+ ", required:" + resourceRequest.request.getNumWorks() +
+ ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+ ", liveWorkers=" + liveWorkerResources.size());
+ }
List<WorkerResource> workerResources = chooseWorkers(false,
resourceRequest.request.getMemoryMBSlots(),
resourceRequest.request.getDiskSlots(),
resourceRequest.request.getNumWorks());
- LOG.debug("====> allocateWorkerResources: allocated:" + workerResources.size());
+ LOG.debug("allocateWorkerResources: allocated:" + workerResources.size());
if(workerResources.size() > 0) {
if(resourceRequest.queryMasterRequest) {
@@ -239,10 +241,11 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
LOG.debug("=========================================");
}
- requestQueue.add(resourceRequest);
+ requestQueue.put(resourceRequest);
Thread.sleep(100);
}
} catch(InterruptedException ie) {
+ LOG.error(ie);
}
}
}
@@ -334,13 +337,14 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
queryMasterWorkerResource = queryMasterMap.remove(queryId);
}
}
+
WorkerResource workerResource = new WorkerResource();
workerResource.copyId(queryMasterWorkerResource);
workerResource.setMemoryMBSlots(queryMasterMemoryMB);
workerResource.setDiskSlots(queryMasterDiskSlot);
workerResource.setCpuCoreSlots(0);
releaseWorkerResource(queryId, workerResource);
- LOG.info("released QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
+ LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
}
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
@@ -354,6 +358,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
WorkerResource workerResource = allWorkerResourceMap.get(hostAndPort);
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(request.getTajoWorkerHost());
@@ -363,6 +371,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setManagerPort(request.getTajoWorkerPort());
workerResource.setClientPort(request.getTajoWorkerClientPort());
workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
+ workerResource.setHttpPort(request.getTajoWorkerHttpPort());
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
@@ -370,6 +379,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setMemoryMBSlots(request.getServerStatus().getSystem().getTotalMemoryMB());
workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
workerResource.setMemoryMBSlots(4096);
workerResource.setDiskSlots(4);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b958761..a1a4c3e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -31,6 +31,7 @@ public class WorkerResource {
private int managerPort;
private int clientPort;
private int pullServerPort;
+ private int httpPort;
private int diskSlots;
private int cpuCoreSlots;
@@ -40,6 +41,12 @@ public class WorkerResource {
private int usedMemoryMBSlots;
private int usedCpuCoreSlots;
+ private long maxHeap;
+ private long freeHeap;
+ private long totalHeap;
+
+ private int numRunningTasks;
+
private boolean queryMasterAllocated;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -251,4 +258,44 @@ public class WorkerResource {
public void setPullServerPort(int pullServerPort) {
this.pullServerPort = pullServerPort;
}
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+
+ public long getMaxHeap() {
+ return maxHeap;
+ }
+
+ public void setMaxHeap(long maxHeap) {
+ this.maxHeap = maxHeap;
+ }
+
+ public long getFreeHeap() {
+ return freeHeap;
+ }
+
+ public void setFreeHeap(long freeHeap) {
+ this.freeHeap = freeHeap;
+ }
+
+ public long getTotalHeap() {
+ return totalHeap;
+ }
+
+ public void setTotalHeap(long totalHeap) {
+ this.totalHeap = totalHeap;
+ }
+
+ public int getNumRunningTasks() {
+ return numRunningTasks;
+ }
+
+ public void setNumRunningTasks(int numRunningTasks) {
+ this.numRunningTasks = numRunningTasks;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
index 91fdb29..3164bd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -88,13 +88,15 @@ public class HttpServer {
final String appDir = getWebAppsPath(name);
ContextHandlerCollection contexts = new ContextHandlerCollection();
- webServer.setHandler(contexts);
webAppContext = new WebAppContext();
webAppContext.setDisplayName(name);
webAppContext.setContextPath("/");
- webAppContext.setWar(appDir + "/" + name);
- webServer.addHandler(webAppContext);
+ webAppContext.setResourceBase(appDir + "/" + name);
+ webAppContext.setDescriptor(appDir + "/" + name + "/WEB-INF/web.xml");
+
+ contexts.addHandler(webAppContext);
+ webServer.setHandler(contexts);
addDefaultApps(contexts, appDir, conf);
}
@@ -236,11 +238,10 @@ public class HttpServer {
}
}
- protected String getWebAppsPath(String appName) throws FileNotFoundException {
- URL url = getClass().getClassLoader().getResource("webapps/" + appName);
+ protected String getWebAppsPath(String name) throws FileNotFoundException {
+ URL url = getClass().getClassLoader().getResource("webapps/" + name);
if (url == null) {
- throw new FileNotFoundException("webapps/" + appName
- + " not found in CLASSPATH");
+ throw new FileNotFoundException("webapps/" + name + " not found in CLASSPATH");
}
String urlString = url.toString();
return urlString.substring(0, urlString.lastIndexOf('/'));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 13cd98a..5cf2e7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -27,17 +27,18 @@ import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import java.io.IOException;
public class TajoQueryEngine {
- private final static Log LOG = LogFactory.getLog(TajoQueryEngine.class);
- private final StorageManager storageManager;
+
+ private final AbstractStorageManager storageManager;
private final PhysicalPlanner phyPlanner;
public TajoQueryEngine(TajoConf conf) throws IOException {
- this.storageManager = new StorageManager(conf);
+ this.storageManager = StorageManagerFactory.getStorageManager(conf);
this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index da8c062..05b5416 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -93,6 +93,8 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterSlots = new AtomicInteger();
+ private int httpPort;
+
public TajoWorker(String daemonMode) throws Exception {
super(TajoWorker.class.getName());
this.daemonMode = daemonMode;
@@ -114,7 +116,6 @@ public class TajoWorker extends CompositeService {
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
randomPort = false;
}
- int infoPort = tajoConf.getInt("tajo.worker.info.port", 8090);
int clientPort = tajoConf.getInt("tajo.worker.client.rpc.port", 8091);
int managerPort = tajoConf.getInt("tajo.worker.manager.rpc.port", 8092);
@@ -124,14 +125,6 @@ public class TajoWorker extends CompositeService {
tajoConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
//infoPort = 0;
}
- try {
- //TODO WebServer port configurable
- webServer = StaticHttpServer.getInstance(this, "admin", null, infoPort,
- true, null, tajoConf, null);
- webServer.start();
- } catch (Exception e) {
- LOG.error("Can' start info http server:" + e.getMessage(), e);
- }
if(!"qm".equals(daemonMode)) {
taskRunnerManager = new TaskRunnerManager(workerContext);
@@ -149,7 +142,22 @@ public class TajoWorker extends CompositeService {
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
addService(tajoWorkerManagerService);
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
+ + managerPort);
+
+ try {
+ httpPort = tajoConf.getInt("tajo.worker.http.port", 28080);
+ webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
+ true, null, tajoConf, null);
+ webServer.start();
+ httpPort = webServer.getPort();
+ LOG.info("Worker info server started:" + httpPort);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
+ + managerPort);
+
} else {
LOG.info("Tajo worker started: mode=" + daemonMode);
}
@@ -157,6 +165,10 @@ public class TajoWorker extends CompositeService {
super.init(conf);
}
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
@Override
public void start() {
super.start();
@@ -187,6 +199,12 @@ public class TajoWorker extends CompositeService {
tajoMasterRpc.close();
}
+ if(webServer != null) {
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ }
+ }
super.stop();
LOG.info("TajoWorker main thread exiting");
}
@@ -220,6 +238,9 @@ public class TajoWorker extends CompositeService {
return pullService;
}
+ public String getWorkerName() {
+ return getTajoWorkerManagerService().getHostAndPort();
+ }
public void stopWorker(boolean force) {
stop();
if(force) {
@@ -371,18 +392,26 @@ public class TajoWorker extends CompositeService {
.build());
}
}
+ TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+ TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ .setMaxHeap(Runtime.getRuntime().maxMemory())
+ .setFreeHeap(Runtime.getRuntime().freeMemory())
+ .setTotalHeap(Runtime.getRuntime().totalMemory())
+ .build();
+
TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
- .setRunningTaskNum(0) //TODO
+ .setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks()) //TODO
.setSystem(systemInfo)
.setDiskSlots(workerDiskSlots)
+ .setJvmHeap(jvmHeap)
.build();
-
TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
.setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
.setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setTajoWorkerHttpPort(httpPort)
.setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 28cc5f6..0286afa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -128,10 +128,10 @@ public class TajoWorkerManagerService extends CompositeService
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
if(queryMasterTask == null || queryMasterTask.isStopped()) {
- LOG.info("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
done.run(TaskSchedulerImpl.stopTaskRunnerReq);
} else {
- LOG.info("getTask:" + cid + ", ebId:" + ebId);
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId);
queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index e66751c..915234f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -360,7 +360,6 @@ public class Task {
}
public void run() {
-
String errorMessage = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index f1ca567..6d06083 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -82,6 +82,12 @@ public class TaskRunnerManager extends CompositeService {
}
}
+ public int getNumTasks() {
+ synchronized(taskRunnerMap) {
+ return taskRunnerMap.size();
+ }
+ }
+
public void startTask(final String[] params) {
//TODO change to use event dispatcher
Thread t = new Thread() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index 04abfa3..cdf78a6 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -42,10 +42,18 @@ message ServerStatusProto {
required int64 freeSpace = 3;
required int64 usableSpace = 4;
}
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
required System system = 1;
required int32 diskSlots = 2;
repeated Disk disk = 3;
required int32 runningTaskNum = 4;
+ required JvmHeap jvmHeap = 5;
}
message TajoHeartbeat {
@@ -57,6 +65,9 @@ message TajoHeartbeat {
optional QueryState state = 6;
optional string statusMessage = 7;
optional int32 tajoWorkerPullServerPort = 8;
+ optional int32 tajoWorkerHttpPort = 9;
+ optional float queryProgress = 10;
+ optional int64 queryFinishTime = 11;
}
message TajoHeartbeatResponse {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
new file mode 100644
index 0000000..5d00e3b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ version="2.5">
+ <display-name>Tajo Master</display-name>
+ <welcome-file-list>
+ <welcome-file>index.jsp</welcome-file>
+ </welcome-file-list>
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index 7cedfd4..becf417 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -8,6 +8,8 @@
<%@ page import="org.apache.tajo.master.*" %>
<%@ page import="org.apache.tajo.master.rm.*" %>
<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="java.text.SimpleDateFormat" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
@@ -19,24 +21,32 @@
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
CatalogService catalog = master.getCatalog();
Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
%>
</head>
<body>
<img src='img/tajo_logo.png'/>
<hr/>
+
+<a href="index.jsp">Main</a>
+<a href="query.jsp">Query</a>
+<h3>Works</h3>
+<div>Live:<%=wokerKeys.size()%></div>
<table>
- <tr><th>Worker</th><th>Ports</th><th>Slot</th></th><th>Memory(Used/Capacity)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <tr><th>Worker</th><th>Ports</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
<%
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
- Collections.sort(wokerKeys);
for(String eachWorker: wokerKeys) {
WorkerResource worker = workers.get(eachWorker);
+ String workerHttp = "http://" + worker.getAllocatedHost() + ":" + worker.getHttpPort();
%>
+
<tr>
- <td><%=eachWorker%></td>
+ <td><a href='<%=workerHttp%>'><%=eachWorker%></a></td>
<td><%=worker.portsToStr()%></td>
+ <td><%=worker.getNumRunningTasks()%></td>
<td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
- <td><%=worker.getUsedMemoryMBSlots()%>/<%=worker.getMemoryMBSlots()%> MB</td>
+ <td><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
<td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
<td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
<td><%=worker.getWorkerStatus()%></td>
@@ -53,5 +63,54 @@
}
%>
</table>
+
+<%
+ Collection<QueryInProgress> runningQueries = master.getContext().getQueryJobManager().getRunningQueries();
+ Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
+%>
+<hr/>
+<h3>Running Queries</h3>
+<table>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>sql</th></tr>
+<%
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for(QueryInProgress eachQuery: runningQueries) {
+ long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
+%>
+ <tr>
+ <td><%=eachQuery.getQueryId()%></td>
+ <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
+ <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
+ <td><%=(int)(time/1000)%> sec</td>
+ <td><%=eachQuery.getQueryInfo().getSql()%></td>
+ </tr>
+<%
+ }
+%>
+</table>
+
+<hr/>
+<h3>Finished Queries</h3>
+<table>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Finished</th><th>Time</th><th>Status</th><th>sql</th></tr>
+ <%
+ for(QueryInProgress eachQuery: finishedQueries) {
+ long runTime = eachQuery.getQueryInfo().getFinishTime() == 0 ? -1 :
+ eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime();
+ %>
+ <tr>
+ <td><%=eachQuery.getQueryId()%></td>
+ <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getFinishTime())%></td>
+ <td><%=runTime%> ms</td>
+ <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
+ <td><%=eachQuery.getQueryInfo().getSql()%></td>
+ </tr>
+ <%
+ }
+ %>
+</table>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
new file mode 100644
index 0000000..10d8e13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -0,0 +1,38 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo main</title>
+ <%
+ TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ CatalogService catalog = master.getCatalog();
+ Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+ %>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+<hr/>
+
+<a href="index.jsp">Main</a>
+<a href="query.jsp">Query</a>
+
+<div><h3>Query</h3></div>
+<div>
+ <textarea></textarea>
+</div>
+<div>Run Query</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
new file mode 100644
index 0000000..8badd7a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ version="2.5">
+ <display-name>Tajo Worker</display-name>
+ <welcome-file-list>
+ <welcome-file>index.jsp</welcome-file>
+ </welcome-file-list>
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
new file mode 100644
index 0000000..4488598
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
@@ -0,0 +1,60 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo main</title>
+<%
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ Collection<QueryMasterTask> queryMasterTasks = tajoWorker.getWorkerContext()
+ .getTajoWorkerManagerService().getQueryMaster().getQueryMasterTasks();
+%>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+
+<h3><%=tajoWorker.getWorkerContext().getWorkerName()%></h3>
+<hr/>
+
+<table border=0>
+ <tr><td width='100'>MaxHeap: </td><td><%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td>
+ <tr><td width='100'>TotalHeap: </td><td><%=Runtime.getRuntime().totalMemory()/1024/1024%> MB</td>
+ <tr><td width='100'>FreeHeap: </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB</td>
+</table>
+
+<h3>QueryMaster</h3>
+<table>
+
+<%
+ for(QueryMasterTask eachQueryMasterTask: queryMasterTasks) {
+ Query query = eachQueryMasterTask.getQuery();
+%>
+ <tr>
+ <td><a href='querydetail.jsp?queryId=<%=query.getId()%>'><%=query.getId()%></a></td>
+ <td><%=query.getFinishTime()%></td>
+ <td><%=query.getStartTime()%></td>
+ <td><%=query.getProgress()%></td>
+ </tr>
+<%
+ }
+%>
+</table>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
new file mode 100644
index 0000000..eaaafe5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -0,0 +1,60 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.master.querymaster.*" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Query Detail Info</title>
+<%
+ QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
+ .getTajoWorkerManagerService().getQueryMaster().getQueryMasterTask(queryId);
+
+ Query query = queryMasterTask.getQuery();
+ Collection<SubQuery> subQueries = query.getSubQueries();
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for(SubQuery eachSubQuery: subQueries) {
+%>
+ <div><%=eachSubQuery.getId()%>(<%=eachSubQuery.getState()%>)</div>
+ <div>Started:<%=df.format(eachSubQuery.getStartTime())%>, <%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></div>
+ <table>
+ <tr><th>Id</th><th>Status</th><th>Start Time</th><th>Running Time</th><th>Host</th></tr>
+<%
+ QueryUnit[] queryUnits = eachSubQuery.getQueryUnits();
+ for(QueryUnit eachQueryUnit: queryUnits) {
+ //QueryUnitAttempt queryUnitAttempt = eachQueryUnit.getSuccessfulAttempt();
+%>
+ <tr>
+ <td><%=eachQueryUnit.getId()%></td>
+ <td><%=eachQueryUnit.getState()%></td>
+ <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
+ <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime()%> ms</td>
+ <td><%=eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost()%></td>
+ </tr>
+<%
+ }
+%>
+ </table>
+<%
+ }
+%>
+</head>
+<body>
\ No newline at end of file