You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/13 05:29:58 UTC
[3/4] TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
new file mode 100644
index 0000000..628e7bc
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/style.css
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<style type="text/css" >
+
+ h5 {}
+ h1 {
+ font-size:25pt;
+ font-weight:bold;
+ }
+ h3 {
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left: 4px;
+ font-size:15pt;
+ font-weight:bold;
+ }
+ h2 {
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left:4px;
+ font-size:18pt;
+ font-weight:bold;
+ }
+ h2.line {
+ margin-top:45px;
+ margin-bottom:15px;
+ margin-left:4px;
+ color:#333333;
+ font-size:18pt;
+ font-weight:bold;
+ border-bottom:1px solid #999999;
+ }
+ h2.compactline{
+ margin-top:5px;
+ margin-bottom:5px;
+ margin-left:4px;
+ margin-right:4px;
+ color:#333333;
+ font-size:18pt;
+ font-weight:bold;
+ border-bottom:1px solid #999999;
+ }
+ td {
+ border: 1px solid #999999;
+ padding-left : 15px;
+ padding-top:2px;
+ padding-bottom:2px;
+ margin : 0px;
+ word-wrap: break-word;
+ }
+ td.long {
+ width:450px;
+ border: 1px solid #999999;
+ padding : 2px;
+ margin : 0px;
+ text-align:center;
+ }
+ th {
+ border: 1px solid #777777;
+ font-weight:bold;
+ color:#333333;
+ background-color:#cccccc;
+ text-align:left;
+ padding-left:15px;
+ padding-top:3px;
+ padding-bottom:3px;
+ }
+ th.small {
+ font-weight:bold;
+ width:100px;
+ }
+ table {
+ border-collapse:collapse;
+ }
+ table.new {
+ border-collapse:collapse;
+ width:95%;
+ border:1px solid #999999;
+ padding:5px;
+ table-layout:fixed;
+ }
+ table.noborder {
+ border-collapse:collapse;
+ width:98%;
+ border:none;
+ margin-top:5px;
+ }
+ td.rightborder{
+ padding-left:0px;
+ border-right:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ border-bottom:none;
+ text-align:center;
+ }
+ td.noborder{
+ padding-left:0px;
+ border:none;
+ text-align:center;
+ }
+ th.rightbottom{
+ padding-left:0px;
+ border-right:1px solid #cccccc;
+ border-bottom:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ background-color:#ffffff;
+ text-align:center;
+ }
+ th.bottom{
+ padding-left:0px;
+ border-right:none;
+ border-bottom:1px solid #cccccc;
+ border-top:none;
+ border-left:none;
+ background-color:#ffffff;
+ text-align:center;
+ }
+ iframe {
+ width:1024px;
+ overflow:hidden;
+ border:0px;
+ padding:0px;
+ }
+ div.tajoinfo {
+ width :350px;
+ height:45px;
+ border:1px solid black;
+ margin-top:80px;
+ margin-bottom:2px;
+ margin-left:80px;
+ margin-right:2px;
+ float:left;
+ }
+ div.tajoimage {
+ width:450px;
+ height:125px;
+ margin:3px;
+ float:left;
+ }
+ div.container {
+ width:860px;
+ margin:auto;
+ overflow:auto;
+ }
+ .topcontainer {
+ border:1px solid green;
+ margin:auto;
+ min-height:400px;
+ overflow:auto;
+ }
+ div.leftbox {
+ width:450px;
+ margin:6px;
+ padding:5px;
+ float:left;
+ }
+ div.bottombox {
+ margin:5px;
+ padding:5px;
+ }
+ div.leftcontent {
+ border:1px solid black;
+ margin:auto;
+ min-height:400px;
+ }
+ div.titlebox {
+ width:inherit;
+ border:2px solid #999999;
+ }
+ div.contentbox {
+ padding-top:5px;
+ padding-bottom:5px;
+ width:inherit;
+ border-left:2px solid #999999;
+ border-right:2px solid #999999;
+ border-bottom:1px solid #999999;
+ border-top:1px solid #999999;
+ overflow:hidden;
+ word-wrap:break-word;
+ }
+ div#tajotitle {
+ height:40px;
+ border:1px solid black;
+ margin-top:5px;
+ margin-bottom:5px;
+ }
+ div.jobbox {
+ width:300px;
+ margin:6px;
+ padding:5px;
+ float:left;
+ }
+ div#bottomtabletitle {
+ width:700px;
+ border:2px solid #999999;
+ }
+ div.center {
+ margin-top:10px;
+ text-align:center;
+ }
+ div.headline {
+ background-color:#999999;
+ padding:3px;
+ text-align:center;
+ }
+ div.headline_2 {
+ background-color:#999999;
+ padding:3px;
+ }
+ div.command {
+ margin:auto;
+ width:600px;
+ height:320px;
+ text-align:center;
+ }
+ textarea.command {
+ margin:3px;
+ width:550px;
+ height:310px;
+ }
+ hr {
+ border:1px solid #999999;
+ }
+ a.headline {
+ color:#ffffff;
+ font-size:13pt;
+ font-weight:bold;
+ text-decoration:none;
+ }
+ a.tablelink {
+ color:#000000;
+ text-decoration:none;
+ }
+ a.tablelink:hover {
+ color:#666666;
+ text-decoration:none;
+ }
+ font.maintitle {
+ margin-left:35px;
+ font-size:35pt;
+ font-weight:bold;
+ }
+ font.subtitle {
+ margin-left:35px;
+ font-size:25pt;
+ font-weight:bold;
+ }
+ ul li.off {
+ color:#ee4444;
+ }
+ ul li.on {
+ color:#33dd33;
+ }
+ div.outbox {
+ margin-top:5px;
+ padding-bottom:5px;
+ border:1px solid #cccccc;
+ }
+ div.outbox_order {
+ margin-top:7px;
+ padding-bottom:5px;
+ border:1px solid #cccccc;
+ width:425px;
+ height:auto;
+ float:left;
+ }
+
+ </style>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 5a85a07..d1efc1c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,21 +21,19 @@
*/
package org.apache.tajo;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.FileUtil;
import java.io.IOException;
-import java.util.UUID;
public class BackendTestingUtil {
public final static Schema mockupSchema;
@@ -51,7 +49,7 @@ public class BackendTestingUtil {
public static void writeTmpTable(TajoConf conf, Path path,
String tableName, boolean writeMeta)
throws IOException {
- StorageManager sm = StorageManager.get(conf, path);
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, path);
FileSystem fs = sm.getFileSystem();
Appender appender;
@@ -64,7 +62,7 @@ public class BackendTestingUtil {
if (writeMeta) {
FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
}
- appender = StorageManager.getAppender(conf, mockupMeta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(mockupMeta, tablePath);
appender.init();
int deptSize = 10000;
@@ -85,28 +83,6 @@ public class BackendTestingUtil {
writeTmpTable(conf, new Path(parent), tableName, writeMeta);
}
- private TajoConf conf;
- private CatalogService catalog;
- private SQLAnalyzer analyzer;
- private LogicalPlanner planner;
- private LogicalOptimizer optimizer;
-
public BackendTestingUtil(TajoConf conf) throws IOException {
- this.conf = conf;
- this.catalog = new LocalCatalogWrapper(conf);
- analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
- }
-
- public static Path createTmpTestDir() throws IOException {
- String randomStr = UUID.randomUUID().toString();
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path dir = new Path("target/test-data", randomStr);
- // Have it cleaned up on exit
- if (fs.exists(dir)) {
- fs.delete(dir, true);
- }
- return dir;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 88029ea..e3b5fe7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,7 +44,6 @@ import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
import java.io.*;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.ResultSet;
@@ -189,6 +188,8 @@ public class TajoTestingCluster {
// Do old style too just to be safe.
this.conf.set("fs.default.name", defaultFS.getUri().toString());
+ this.conf.set(TajoConf.ConfVars.ROOT_DIR.name(), defaultFS.getUri() + "/tajo");
+
return this.dfsCluster;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index fd492be..84a5c50 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -22,10 +22,6 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.apache.tajo.BackendTestingUtil;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
@@ -33,6 +29,10 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Set;
@@ -41,7 +41,7 @@ import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class TestTajoClient {
- private static TajoTestingCluster util;
+ private static TajoTestingCluster cluster;
private static TajoConf conf;
private static TajoClient tajo;
private static String TEST_PATH = "target/test-data/"
@@ -50,9 +50,9 @@ public class TestTajoClient {
@BeforeClass
public static void setUp() throws Exception {
- util = new TajoTestingCluster();
- util.startMiniCluster(1);
- conf = util.getConfiguration();
+ cluster = new TajoTestingCluster();
+ cluster.startMiniCluster(1);
+ conf = cluster.getConfiguration();
Thread.sleep(3000);
tajo = new TajoClient(conf);
@@ -61,7 +61,7 @@ public class TestTajoClient {
@AfterClass
public static void tearDown() throws Exception {
- util.shutdownMiniCluster();
+ cluster.shutdownMiniCluster();
if(tajo != null) {
tajo.close();
}
@@ -114,7 +114,7 @@ public class TestTajoClient {
@Test
public final void testCreateAndDropExternalTableByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropExternalTableByExecuteQuery";
BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
@@ -135,7 +135,7 @@ public class TestTajoClient {
@Test
public final void testCreateAndDropTableByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTableByExecuteQuery";
assertFalse(tajo.existTable(tableName));
@@ -145,8 +145,8 @@ public class TestTajoClient {
tajo.updateQuery(tql);
assertTrue(tajo.existTable(tableName));
- FileSystem hdfs = FileSystem.get(conf);
Path tablePath = tajo.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
tajo.updateQuery("drop table " + tableName);
@@ -156,7 +156,7 @@ public class TestTajoClient {
@Test
public final void testDDLByExecuteQuery() throws IOException, ServiceException {
- TajoConf conf = util.getConfiguration();
+ TajoConf conf = cluster.getConfiguration();
final String tableName = "testDDLByExecuteQuery";
BackendTestingUtil.writeTmpTable(conf, "file:///tmp", tableName, false);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index ede73c5..66060ce 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -66,7 +66,7 @@ public class TestGlobalQueryPlanner {
private static LogicalPlanner logicalPlanner;
private static LogicalOptimizer optimizer;
private static QueryId queryId;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
@BeforeClass
public static void setup() throws Exception {
@@ -89,7 +89,7 @@ public class TestGlobalQueryPlanner {
catalog.registerFunction(funcDesc);
}
- sm = new StorageManager(util.getConfiguration());
+ sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -100,7 +100,7 @@ public class TestGlobalQueryPlanner {
dispatcher.init(conf);
dispatcher.start();
- planner = new GlobalPlanner(conf, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, sm,
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
@@ -124,7 +124,7 @@ public class TestGlobalQueryPlanner {
fs.delete(tablePath.getParent(), true);
}
fs.mkdirs(tablePath.getParent());
- appender = StorageManager.getAppender(conf, meta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
tupleNum = 100;
for (j = 0; j < tupleNum; j++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index c665b44..7572ad5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -79,7 +79,7 @@ public class TestGlobalQueryOptimizer {
conf = new TajoConf(util.getConfiguration());
catalog = util.getMiniCatalogCluster().getCatalog();
- StorageManager sm = new StorageManager(util.getConfiguration());
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
@@ -88,7 +88,7 @@ public class TestGlobalQueryOptimizer {
AsyncDispatcher dispatcher = new AsyncDispatcher();
- planner = new GlobalPlanner(conf, new StorageManager(conf),
+ planner = new GlobalPlanner(conf, sm,
dispatcher.getEventHandler());
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
@@ -112,7 +112,7 @@ public class TestGlobalQueryOptimizer {
fs.delete(tablePath.getParent(), true);
}
fs.mkdirs(tablePath.getParent());
- appender = StorageManager.getAppender(conf, meta, tablePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
tupleNum = 100;
for (j = 0; j < tupleNum; j++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 97459f9..b714981 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -54,7 +54,7 @@ public class TestBNLJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private static int OUTER_TUPLE_NUM = 1000;
@@ -69,7 +69,7 @@ public class TestBNLJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestBNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -100,7 +100,7 @@ public class TestBNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 1c651f6..8021882 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -59,7 +59,7 @@ public class TestBSTIndexExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Schema idxSchema;
private TupleComparator comp;
private BSTIndex.BSTIndexWriter writer;
@@ -82,7 +82,7 @@ public class TestBSTIndexExec {
catalog = util.getMiniCatalogCluster().getCatalog();
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
- sm = StorageManager.get(conf, workDir);
+ sm = StorageManagerFactory.getStorageManager(conf, workDir);
idxPath = new Path(workDir, "test.idx");
@@ -108,7 +108,7 @@ public class TestBSTIndexExec {
fs = tablePath.getFileSystem(conf);
fs.mkdirs(tablePath.getParent());
- FileAppender appender = (FileAppender)StorageManager.getAppender(conf, meta, tablePath);
+ FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
appender.init();
Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
for (int i = 0; i < 10000; i++) {
@@ -150,7 +150,9 @@ public class TestBSTIndexExec {
@Test
public void testEqual() throws Exception {
-
+ if(conf.getBoolean("tajo.storage.manager.v2", false)) {
+ return;
+ }
this.rndKey = rnd.nextInt(250);
final String QUERY = "select * from employee where managerId = " + rndKey;
@@ -180,7 +182,7 @@ public class TestBSTIndexExec {
}
private class TmpPlanner extends PhysicalPlannerImpl {
- public TmpPlanner(TajoConf conf, StorageManager sm) {
+ public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
super(conf, sm);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 0fc3773..01fd370 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -51,7 +51,7 @@ public class TestExternalSortExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
@@ -67,7 +67,7 @@ public class TestExternalSortExec {
util = new TajoTestingCluster();
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.enableStats();
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -138,6 +138,7 @@ public class TestExternalSortExec {
int cnt = 0;
exec.init();
long start = System.currentTimeMillis();
+
while ((tuple = exec.next()) != null) {
curVal = tuple.get(0);
if (preVal != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 8d80d9e..886dddc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashAntiJoinExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashAntiJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashAntiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -100,7 +100,7 @@ public class TestHashAntiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index e270df3..cf89cf8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -53,7 +53,7 @@ public class TestHashJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -66,7 +66,7 @@ public class TestHashJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -77,7 +77,7 @@ public class TestHashJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 10; i++) {
@@ -99,7 +99,7 @@ public class TestHashJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 317c1f2..d986a8f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -51,7 +51,7 @@ public class TestHashSemiJoinExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -64,7 +64,7 @@ public class TestHashSemiJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestHashSemiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -100,7 +100,7 @@ public class TestHashSemiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
// make 27 tuples
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 776882b..e77a734 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -55,7 +55,7 @@ public class TestMergeJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private TableDesc employee;
private TableDesc people;
@@ -68,7 +68,7 @@ public class TestMergeJoinExec {
Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
FileSystem fs = testDir.getFileSystem(conf);
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerId", Type.INT4);
@@ -79,7 +79,7 @@ public class TestMergeJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 10; i++) {
@@ -108,7 +108,7 @@ public class TestMergeJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 10; i += 2) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 9289dc9..2d82f6c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -53,7 +53,7 @@ public class TestNLJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Path testDir;
private TableDesc employee;
@@ -65,7 +65,7 @@ public class TestNLJoinExec {
catalog = util.startCatalogCluster().getCatalog();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
conf = util.getConfiguration();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -75,7 +75,7 @@ public class TestNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 50; i++) {
@@ -99,7 +99,7 @@ public class TestNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peoplePath);
appender.init();
tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
for (int i = 1; i < 50; i += 2) {
@@ -151,7 +151,8 @@ public class TestNLJoinExec {
int i = 0;
exec.init();
- while (exec.next() != null) {
+ Tuple tuple = null;
+ while ( (tuple = exec.next()) != null) {
i++;
}
exec.close();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 75e3b1e..5358d3a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -69,7 +69,7 @@ public class TestPhysicalPlanner {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
private static Path testDir;
private static TableDesc employee = null;
@@ -82,7 +82,7 @@ public class TestPhysicalPlanner {
util.startCatalogCluster();
conf = util.getConfiguration();
testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
catalog.registerFunction(funcDesc);
@@ -107,7 +107,7 @@ public class TestPhysicalPlanner {
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 100; i++) {
@@ -123,7 +123,7 @@ public class TestPhysicalPlanner {
Path scorePath = new Path(testDir, "score");
TableMeta scoreMeta = CatalogUtil.newTableMeta(scoreSchema, StoreType.CSV, new Options());
- appender = StorageManager.getAppender(conf, scoreMeta, scorePath);
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scorePath);
appender.init();
score = new TableDescImpl("score", scoreMeta, scorePath);
tuple = new VTuple(score.getMeta().getSchema().getColumnNum());
@@ -189,7 +189,7 @@ public class TestPhysicalPlanner {
optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
Tuple tuple;
@@ -372,7 +372,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
scanner.init();
Tuple tuple;
int i = 0;
@@ -412,7 +412,7 @@ public class TestPhysicalPlanner {
exec.next();
exec.close();
- Scanner scanner = StorageManager.getScanner(conf, outputMeta, ctx.getOutputPath());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath());
scanner.init();
Tuple tuple;
int i = 0;
@@ -787,8 +787,8 @@ public class TestPhysicalPlanner {
reader.open();
Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
TableMeta meta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV, new Options());
- SeekableScanner scanner = (SeekableScanner)
- StorageManager.getScanner(conf, meta, outputPath);
+ SeekableScanner scanner =
+ StorageManagerFactory.getSeekableScanner(conf, meta, outputPath);
scanner.init();
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 0151cb3..06c5bb7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -50,7 +50,7 @@ public class TestSortExec {
private static SQLAnalyzer analyzer;
private static LogicalPlanner planner;
private static LogicalOptimizer optimizer;
- private static StorageManager sm;
+ private static AbstractStorageManager sm;
private static TajoTestingCluster util;
private static Path workDir;
private static Path tablePath;
@@ -64,7 +64,7 @@ public class TestSortExec {
util = new TajoTestingCluster();
catalog = util.startCatalogCluster().getCatalog();
workDir = CommonTestingUtil.getTestDir(TEST_PATH);
- sm = StorageManager.get(conf, workDir);
+ sm = StorageManagerFactory.getStorageManager(conf, workDir);
Schema schema = new Schema();
schema.addColumn("managerId", Type.INT4);
@@ -76,7 +76,7 @@ public class TestSortExec {
tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
sm.getFileSystem().mkdirs(tablePath.getParent());
- Appender appender = StorageManager.getAppender(conf, employeeMeta, tablePath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, tablePath);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index ba7d36b..d006679 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -22,22 +22,19 @@
package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.sql.ResultSetMetaData;
@@ -49,8 +46,8 @@ import static org.junit.Assert.*;
public class TestResultSetImpl {
private static TajoTestingCluster util;
private static TajoConf conf;
- private static StorageManager sm;
private static TableDesc desc;
+ private static AbstractStorageManager sm;
private static TableMeta scoreMeta;
@BeforeClass
@@ -58,7 +55,7 @@ public class TestResultSetImpl {
util = new TajoTestingCluster();
util.startMiniCluster(3);
conf = util.getConfiguration();
- sm = new StorageManager(conf);
+ sm = StorageManagerFactory.getStorageManager(conf);
Schema scoreSchema = new Schema();
scoreSchema.addColumn("deptname", Type.TEXT);
@@ -68,7 +65,7 @@ public class TestResultSetImpl {
Path p = sm.getTablePath("score");
sm.getFileSystem().mkdirs(p);
- Appender appender = StorageManager.getAppender(conf, scoreMeta, new Path(p, "score"));
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, new Path(p, "score"));
appender.init();
int deptSize = 100;
int tupleNum = 10000;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index a67dd26..b5ce437 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -33,7 +33,8 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -68,7 +69,7 @@ public class TestExecutionBlockCursor {
logicalPlanner = new LogicalPlanner(catalog);
optimizer = new LogicalOptimizer();
- StorageManager sm = new StorageManager(conf);
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index e44ca99..c070c4d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -19,13 +19,9 @@
package org.apache.tajo.storage;
import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
@@ -35,10 +31,14 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.util.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
import java.util.Set;
@@ -46,20 +46,20 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
public class TestRowFile {
- private TajoTestingCluster util;
- private Configuration conf;
+ private TajoTestingCluster cluster;
+ private TajoConf conf;
@Before
public void setup() throws Exception {
- util = new TajoTestingCluster();
- conf = util.getConfiguration();
+ cluster = new TajoTestingCluster();
+ conf = cluster.getConfiguration();
conf.setInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
- util.startMiniDFSCluster(1);
+ cluster.startMiniDFSCluster(1);
}
@After
public void teardown() throws Exception {
- util.shutdownMiniDFSCluster();
+ cluster.shutdownMiniDFSCluster();
}
@Test
@@ -71,15 +71,18 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.ROWFILE);
- Path tablePath = new Path("hdfs:///test");
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf,
+ new Path(conf.get(TajoConf.ConfVars.ROOT_DIR.name())));
+
+ Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
Path dataPath = new Path(tablePath, "test.tbl");
- FileSystem fs = tablePath.getFileSystem(conf);
+ FileSystem fs = sm.getFileSystem();
fs.mkdirs(tablePath);
- FileUtil.writeProto(util.getDefaultFileSystem(), metaPath, meta.getProto());
+ FileUtil.writeProto(fs, metaPath, meta.getProto());
- Appender appender = StorageManager.getAppender(conf, meta, dataPath);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, dataPath);
appender.enableStats();
appender.init();
@@ -96,7 +99,6 @@ public class TestRowFile {
tuple.put(2, stringDatum);
appender.addTuple(tuple);
idSet.add(i+1);
-// System.out.println(tuple.toString());
}
long end = System.currentTimeMillis();
@@ -105,21 +107,20 @@ public class TestRowFile {
TableStat stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
- System.out.println("append time: " + (end-start));
+ System.out.println("append time: " + (end - start));
FileStatus file = fs.getFileStatus(dataPath);
TableProto proto = (TableProto) FileUtil.loadProto(
- util.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
+ cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
meta = new TableMetaImpl(proto);
Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen());
int tupleCnt = 0;
start = System.currentTimeMillis();
- Scanner scanner = StorageManager.getScanner(conf, meta, fragment);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
scanner.init();
while ((tuple=scanner.next()) != null) {
tupleCnt++;
-// System.out.println(tuple.toString());
}
scanner.close();
end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index cf1e9ae..4f2795b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -64,7 +64,7 @@ public class TestRangeRetrieverHandler {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private Schema schema;
private static int TEST_TUPLE = 10000;
private FileSystem fs;
@@ -78,7 +78,7 @@ public class TestRangeRetrieverHandler {
fs = testDir.getFileSystem(conf);
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
- sm = StorageManager.get(conf, testDir);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
@@ -108,7 +108,7 @@ public class TestRangeRetrieverHandler {
Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
fs.mkdirs(tableDir.getParent());
- Appender appender = sm.getAppender(conf, employeeMeta, tableDir);
+ Appender appender = sm.getAppender(employeeMeta, tableDir);
appender.init();
Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
@@ -161,8 +161,10 @@ public class TestRangeRetrieverHandler {
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = (SeekableScanner)
- sm.getScanner(conf, employeeMeta, StorageUtil.concatPath(testDir, "output", "output"));
+
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta,
+ StorageUtil.concatPath(testDir, "output", "output"));
+
scanner.init();
int cnt = 0;
while(scanner.next() != null) {
@@ -220,7 +222,7 @@ public class TestRangeRetrieverHandler {
TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = sm.getAppender(conf, meta, tablePath);
+ Appender appender = sm.getAppender(meta, tablePath);
appender.init();
Tuple tuple = new VTuple(meta.getSchema().getColumnNum());
for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
@@ -271,8 +273,8 @@ public class TestRangeRetrieverHandler {
BSTIndex.BSTIndexReader reader = bst.getIndexReader(
new Path(testDir, "output/index"), keySchema, comp);
reader.open();
- SeekableScanner scanner = (SeekableScanner) StorageManager.getScanner(
- conf, meta, StorageUtil.concatPath(testDir, "output", "output"));
+ SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta,
+ StorageUtil.concatPath(testDir, "output", "output"));
scanner.init();
int cnt = 0;
while(scanner.next() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
new file mode 100644
index 0000000..2b59ecb
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.TableMetaImpl;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
+
+ protected final TajoConf conf;
+ protected final FileSystem fs;
+ protected final Path baseDir;
+ protected final Path tableBaseDir;
+ protected final boolean blocksMetadataEnabled;
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
+
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+ new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ public abstract Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException;
+
+ protected AbstractStorageManager(TajoConf conf) throws IOException {
+ this.conf = conf;
+ this.baseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR));
+ this.tableBaseDir = TajoConf.getWarehousePath(conf);
+ this.fs = baseDir.getFileSystem(conf);
+ this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ if (!this.blocksMetadataEnabled)
+ LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+ }
+
+ public Scanner getScanner(TableMeta meta, Path path)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+ return getScanner(meta, fragment);
+ }
+
+ public Scanner getScanner(TableMeta meta, Fragment fragment)
+ throws IOException {
+ return getScanner(meta, fragment, meta.getSchema());
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public Path getBaseDir() {
+ return this.baseDir;
+ }
+
+ public Path getTableBaseDir() {
+ return this.tableBaseDir;
+ }
+
+ public void delete(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ fs.delete(tablePath, true);
+ }
+
+ public boolean exists(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ return fileSystem.exists(path);
+ }
+
+ /**
+ * This method deletes only data contained in the given path.
+ *
+ * @param path The path in which data are deleted.
+ * @throws IOException
+ */
+ public void deleteData(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ FileStatus[] fileLists = fileSystem.listStatus(path);
+ for (FileStatus status : fileLists) {
+ fileSystem.delete(status.getPath(), true);
+ }
+ }
+
+ public Path getTablePath(String tableName) {
+ return new Path(tableBaseDir, tableName);
+ }
+
+ public Appender getAppender(TableMeta meta, Path path)
+ throws IOException {
+ Appender appender;
+
+ Class<? extends FileAppender> appenderClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+ if (appenderClass == null) {
+ appenderClass = conf.getClass(
+ String.format("tajo.storage.appender-handler.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ FileAppender.class);
+ APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ }
+
+ if (appenderClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ appender = newAppenderInstance(appenderClass, conf, meta, path);
+
+ return appender;
+ }
+
+
+ public TableMeta getTableMeta(Path tablePath) throws IOException {
+ TableMeta meta;
+
+ FileSystem fs = tablePath.getFileSystem(conf);
+ Path tableMetaPath = new Path(tablePath, ".meta");
+ if (!fs.exists(tableMetaPath)) {
+ throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+ }
+
+ FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+ CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
+ CatalogProtos.TableProto.getDefaultInstance());
+ meta = new TableMetaImpl(tableProto);
+
+ return meta;
+ }
+
+ public Fragment[] split(String tableName) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ public Fragment[] split(String tableName, long fragmentSize) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fragmentSize);
+ }
+
+ public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ TableMeta meta = getTableMeta(tablePath);
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
+ listTablets.add(tablet);
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public Fragment[] split(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+ }
+
+ public Fragment[] split(String tableName, Path tablePath) throws IOException {
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ private Fragment[] split(String tableName, Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ TableMeta meta = getTableMeta(tablePath);
+ long defaultBlockSize = size;
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+ } else {
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+ }
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+ Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<Fragment> listTablets = new ArrayList<Fragment>();
+ Fragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
+ } else {
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
+ }
+ }
+
+ Fragment[] tablets = new Fragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public void writeTableMeta(Path tableRoot, TableMeta meta)
+ throws IOException {
+ FileSystem fs = tableRoot.getFileSystem(conf);
+ FSDataOutputStream out = fs.create(new Path(tableRoot, ".meta"));
+ FileUtil.writeProto(out, meta.getProto());
+ out.flush();
+ out.close();
+ }
+
+ public long calculateSize(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ long totalSize = 0;
+
+ if (fs.exists(tablePath)) {
+ for (FileStatus status : fs.listStatus(tablePath)) {
+ totalSize += status.getLen();
+ }
+ }
+
+ return totalSize;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // FileInputFormat Area
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * List input directories.
+ * Subclasses may override to, e.g., select only files matching a regular
+ * expression.
+ *
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listStatus(Path path) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ Path[] dirs = new Path[]{path};
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ for (int i = 0; i < dirs.length; ++i) {
+ Path p = dirs[i];
+
+ FileSystem fs = p.getFileSystem(conf);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat : matches) {
+ if (globStat.isDirectory()) {
+ for (FileStatus stat : fs.listStatus(globStat.getPath(),
+ inputFilter)) {
+ result.add(stat);
+ }
+ } else {
+ result.add(globStat);
+ }
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ /**
+ * Get the lower bound on split size imposed by the format.
+ *
+ * @return the number of bytes of the minimal split for this format
+ */
+ protected long getFormatMinSplitSize() {
+ return 1;
+ }
+
+ /**
+ * Is the given filename splitable? Usually, true, but if the file is
+ * stream compressed, it will not be.
+ * <p/>
+ * <code>FileInputFormat</code> implementations can override this and return
+ * <code>false</code> to ensure that individual input files are never split-up
+ * so that Mappers process entire files.
+ *
+ * @param filename the file name to check
+ * @return is this file isSplittable?
+ */
+ protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
+ Scanner scanner = getScanner(meta, filename);
+ return scanner.isSplittable();
+ }
+
+ @Deprecated
+ protected long computeSplitSize(long blockSize, long minSize,
+ long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ @Deprecated
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ @Deprecated
+ protected int getBlockIndex(BlockLocation[] blkLocations,
+ long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) &&
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+ return new Fragment(fragmentId, file, meta, start, length);
+ }
+
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+ int[] diskIds) throws IOException {
+ return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
+ }
+
+ // for Non Splittable. eg, compressed gzip TextFile
+ protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+ BlockLocation[] blkLocations) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+ int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+ hosts[i] = entry.getKey();
+ hostsBlockCount[i] = entry.getValue();
+ }
+ return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
+ }
+
+ /**
+ * Get the maximum split size.
+ *
+ * @return the maximum number of bytes a split can include
+ */
+ @Deprecated
+ public static long getMaxSplitSize() {
+ // TODO - to be configurable
+ return 536870912L;
+ }
+
+ /**
+ * Get the minimum split size
+ *
+ * @return the minimum number of bytes that can be in a split
+ */
+ @Deprecated
+ public static long getMinSplitSize() {
+ // TODO - to be configurable
+ return 67108864L;
+ }
+
+ /**
+ * Get Disk Ids by Volume Bytes
+ */
+ private int[] getDiskIds(VolumeId[] volumeIds) {
+ int[] diskIds = new int[volumeIds.length];
+ for (int i = 0; i < volumeIds.length; i++) {
+ int diskId = -1;
+ if (volumeIds[i] != null && volumeIds[i].isValid()) {
+ String volumeIdString = volumeIds[i].toString();
+ byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+ if (volumeIdBytes.length == 4) {
+ diskId = Bytes.toInt(volumeIdBytes);
+ } else if (volumeIdBytes.length == 1) {
+ diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
+ }
+ }
+ diskIds[i] = diskId;
+ }
+ return diskIds;
+ }
+
+ /**
+ * Generate the map of host and make them into Volume Ids.
+ *
+ */
+ private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+ Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+ for (Fragment frag : frags) {
+ String[] hosts = frag.getHosts();
+ int[] diskIds = frag.getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ Set<Integer> volumeList = volumeMap.get(hosts[i]);
+ if (volumeList == null) {
+ volumeList = new HashSet<Integer>();
+ volumeMap.put(hosts[i], volumeList);
+ }
+
+ if (diskIds.length > 0 && diskIds[i] > -1) {
+ volumeList.add(diskIds[i]);
+ }
+ }
+ }
+
+ return volumeMap;
+ }
+ /**
+ * Generate the list of files and make them into FileSplits.
+ *
+ * @throws IOException
+ */
+ public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
+ // generate splits'
+
+ List<Fragment> splits = new ArrayList<Fragment>();
+ List<FileStatus> files = listStatus(inputPath);
+ FileSystem fs = inputPath.getFileSystem(conf);
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length > 0) {
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ boolean splittable = isSplittable(meta, path);
+ if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+ // supported disk volume
+ BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+ .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+ if (splittable) {
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+ .getVolumeIds())));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
+ }
+
+ } else {
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
+ }
+ }
+ } else {
+ //for zero length files
+ splits.add(makeSplit(tableName, meta, path, 0, length));
+ }
+ }
+
+ LOG.info("Total # of splits: " + splits.size());
+ return splits;
+ }
+
+ private class InvalidInputException extends IOException {
+ public InvalidInputException(
+ List<IOException> errors) {
+ }
+ }
+
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ TableMeta.class,
+ Fragment.class
+ };
+
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ TableMeta.class,
+ Path.class
+ };
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * create a scanner instance.
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
+ Path path) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, meta, path});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
index 4f6dde1..6c31247 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
@@ -23,15 +23,17 @@ import com.google.gson.Gson;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.json.GsonObject;
import org.apache.tajo.storage.json.StorageGsonHelper;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject, GsonObject {
protected FragmentProto.Builder builder = null;
@@ -44,8 +46,8 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
@Expose private boolean distCached = false; // optional
private String[] hosts; // Datanode hostnames
- private int[] hostsBlockCount; // list of block count of hosts
- private int[] diskIds;
+ @Expose private int[] hostsBlockCount; // list of block count of hosts
+ @Expose private int[] diskIds;
public Fragment() {
builder = FragmentProto.newBuilder();
@@ -53,13 +55,13 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException {
this();
- TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ //TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ TableMeta newMeta = meta;
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength());
- this.hosts = blockLocation.getHosts();
- this.diskIds = diskIds;
+ this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength(),
+ blockLocation.getHosts(), diskIds);
}
// Non splittable
@@ -69,7 +71,7 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(tableName, uri, newMeta, start, length);
+ this.set(tableName, uri, newMeta, start, length, null, null);
this.hosts = hosts;
this.hostsBlockCount = hostsBlockCount;
}
@@ -80,26 +82,35 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
- this.set(fragmentId, path, newMeta, start, length);
+ this.set(fragmentId, path, newMeta, start, length, null, null);
}
public Fragment(FragmentProto proto) {
this();
TableMeta newMeta = new TableMetaImpl(proto.getMeta());
+ int[] diskIds = new int[proto.getDiskIdsList().size()];
+ int i = 0;
+ for(Integer eachValue: proto.getDiskIdsList()) {
+ diskIds[i++] = eachValue;
+ }
this.set(proto.getId(), new Path(proto.getPath()), newMeta,
- proto.getStartOffset(), proto.getLength());
+ proto.getStartOffset(), proto.getLength(),
+ proto.getHostsList().toArray(new String[]{}),
+ diskIds);
if (proto.hasDistCached() && proto.getDistCached()) {
distCached = true;
}
}
private void set(String tableName, Path path, TableMeta meta, long start,
- long length) {
+ long length, String[] hosts, int[] diskIds) {
this.tableName = tableName;
this.uri = path;
this.meta = meta;
this.startOffset = start;
this.length = length;
+ this.hosts = hosts;
+ this.diskIds = diskIds;
}
@@ -234,6 +245,9 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
frag.uri = uri;
frag.meta = (TableMeta) (meta != null ? meta.clone() : null);
frag.distCached = distCached;
+ frag.diskIds = diskIds;
+ frag.hosts = hosts;
+ frag.hostsBlockCount = hostsBlockCount;
return frag;
}
@@ -256,6 +270,17 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
builder.setLength(this.length);
builder.setPath(this.uri.toString());
builder.setDistCached(this.distCached);
+ if(diskIds != null) {
+ List<Integer> idList = new ArrayList<Integer>();
+ for(int eachId: diskIds) {
+ idList.add(eachId);
+ }
+ builder.addAllDiskIds(idList);
+ }
+
+ if(hosts != null) {
+ builder.addAllHosts(TUtil.newList(hosts));
+ }
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index a7a1e4a..582c64f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
import java.io.IOException;
import java.util.ArrayList;
@@ -61,7 +62,7 @@ public class MergeScanner implements Scanner {
currentScanner.close();
}
currentFragment = iterator.next();
- currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+ currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
currentScanner.init();
return currentScanner.next();
} else {
@@ -74,7 +75,7 @@ public class MergeScanner implements Scanner {
iterator = fragments.iterator();
if (iterator.hasNext()) {
currentFragment = iterator.next();
- currentScanner = StorageManager.getScanner(conf, meta, currentFragment);
+ currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment);
}
}