You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:51 UTC
[37/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index c11db6f..925c047 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -30,12 +30,10 @@ import java.io.IOException;
public class TajoQueryEngine {
- private final StorageManager storageManager;
private final PhysicalPlanner phyPlanner;
public TajoQueryEngine(TajoConf conf) throws IOException {
- this.storageManager = StorageManager.getStorageManager(conf);
- this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
+ this.phyPlanner = new PhysicalPlannerImpl(conf);
}
public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index cb038df..00eabcc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.Schema;
@@ -53,10 +52,7 @@ import org.apache.tajo.plan.logical.*;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -108,39 +104,6 @@ public class Task {
private Schema finalSchema = null;
private TupleComparator sortComp = null;
- static final String OUTPUT_FILE_PREFIX="part-";
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(2);
- return fmt;
- }
- };
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(6);
- return fmt;
- }
- };
-
- static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
- new ThreadLocal<NumberFormat>() {
- @Override
- public NumberFormat initialValue() {
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(3);
- return fmt;
- }
- };
-
public Task(String taskRunnerId,
Path baseDir,
QueryUnitAttemptId taskId,
@@ -190,13 +153,8 @@ public class Task {
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
- // The final result of a task will be written in a file named part-ss-nnnnnnn,
- // where ss is the subquery id associated with this task, and nnnnnn is the task id.
- Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
- OUTPUT_FILE_PREFIX +
- OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
- OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" +
- OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+ Path outFilePath = ((FileStorageManager)StorageManager.getFileStorageManager(systemConf))
+ .getAppenderFilePath(taskId, queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 18a67d8..49635d1 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -41,6 +41,7 @@
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.Map" %>
<%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -102,8 +103,8 @@
String fragmentInfo = "";
String delim = "";
for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
- fragmentInfo += delim + fileFragment.toString();
+ Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment);
+ fragmentInfo += delim + fragment.toString();
delim = "<br/>";
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 45d3c51..fb98be2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -47,7 +47,7 @@ public class BackendTestingUtil {
public static void writeTmpTable(TajoConf conf, Path tablePath)
throws IOException {
- StorageManager sm = StorageManager.getStorageManager(conf, tablePath);
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, tablePath);
FileSystem fs = sm.getFileSystem();
Appender appender;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
new file mode 100644
index 0000000..a8e4a5c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.tajo.util.Bytes;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
+
+public class HBaseTestClusterUtil {
+ private static final Log LOG = LogFactory.getLog(HBaseTestClusterUtil.class);
+ private Configuration conf;
+ private MiniHBaseCluster hbaseCluster;
+ private MiniZooKeeperCluster zkCluster;
+ private File testBaseDir;
+ public HBaseTestClusterUtil(Configuration conf, File testBaseDir) {
+ this.conf = conf;
+ this.testBaseDir = testBaseDir;
+ }
+ /**
+ * Returns the path to the default root dir the minicluster uses.
+ * Note: this does not cause the root dir to be created.
+ * @return Fully qualified path for the default hbase root dir
+ * @throws java.io.IOException
+ */
+ public Path getDefaultRootDirPath() throws IOException {
+ FileSystem fs = FileSystem.get(this.conf);
+ return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase");
+ }
+
+ /**
+ * Creates an hbase rootdir in user home directory. Also creates hbase
+ * version file. Normally you won't make use of this method. Root hbasedir
+ * is created for you as part of mini cluster startup. You'd only use this
+ * method if you were doing manual operation.
+ * @return Fully qualified path to hbase root dir
+ * @throws java.io.IOException
+ */
+ public Path createRootDir() throws IOException {
+ FileSystem fs = FileSystem.get(this.conf);
+ Path hbaseRootdir = getDefaultRootDirPath();
+ FSUtils.setRootDir(this.conf, hbaseRootdir);
+ fs.mkdirs(hbaseRootdir);
+ FSUtils.setVersion(fs, hbaseRootdir);
+ return hbaseRootdir;
+ }
+
+ public void stopHBaseCluster() throws IOException {
+ if (hbaseCluster != null) {
+ LOG.info("MiniHBaseCluster stopped");
+ hbaseCluster.shutdown();
+ hbaseCluster.waitUntilShutDown();
+ hbaseCluster = null;
+ }
+ }
+
+ public void startHBaseCluster() throws Exception {
+ if (zkCluster == null) {
+ startMiniZKCluster();
+ }
+ if (hbaseCluster != null) {
+ return;
+ }
+
+ System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file");
+ if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+ }
+ if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
+ }
+ conf.setBoolean(REPLICATION_ENABLE_KEY, false);
+ createRootDir();
+
+ Configuration c = HBaseConfiguration.create(this.conf);
+
+ hbaseCluster = new MiniHBaseCluster(c, 1);
+
+ // Don't leave here till we've done a successful scan of the hbase:meta
+ HTable t = new HTable(c, TableName.META_TABLE_NAME);
+ ResultScanner s = t.getScanner(new Scan());
+ while (s.next() != null) {
+ continue;
+ }
+ s.close();
+ t.close();
+ LOG.info("MiniHBaseCluster started");
+
+ }
+
+ /**
+ * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
+ * the port mentionned is used as the default port for ZooKeeper.
+ */
+ public MiniZooKeeperCluster startMiniZKCluster()
+ throws Exception {
+ File zkDataPath = new File(testBaseDir, "zk");
+ if (this.zkCluster != null) {
+ throw new IOException("Cluster already running at " + zkDataPath);
+ }
+ this.zkCluster = new MiniZooKeeperCluster(conf);
+ final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
+ if (defPort > 0){
+ // If there is a port in the config file, we use it.
+ this.zkCluster.setDefaultClientPort(defPort);
+ }
+ int clientPort = this.zkCluster.startup(zkDataPath, 1);
+ this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
+ LOG.info("MiniZooKeeperCluster started");
+
+ return this.zkCluster;
+ }
+
+ public void stopZooKeeperCluster() throws IOException {
+ if (zkCluster != null) {
+ LOG.info("MiniZooKeeperCluster stopped");
+ zkCluster.shutdown();
+ zkCluster = null;
+ }
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public MiniZooKeeperCluster getMiniZooKeeperCluster() {
+ return zkCluster;
+ }
+
+ public MiniHBaseCluster getMiniHBaseCluster() {
+ return hbaseCluster;
+ }
+
+ public HTableDescriptor getTableDescriptor(String tableName) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ try {
+ return admin.getTableDescriptor(Bytes.toBytes(tableName));
+ } finally {
+ admin.close();
+ }
+ }
+
+ public void createTable(HTableDescriptor hTableDesc) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ try {
+ admin.createTable(hTableDesc);
+ } finally {
+ admin.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index ecfb9f5..875e450 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -169,9 +169,9 @@ public class QueryTestCaseBase {
private static String currentDatabase;
private static Set<String> createdTableGlobalSet = new HashSet<String>();
// queries and results directory corresponding to subclass class.
- private Path currentQueryPath;
- private Path currentResultPath;
- private Path currentDatasetPath;
+ protected Path currentQueryPath;
+ protected Path currentResultPath;
+ protected Path currentDatasetPath;
// for getting a method name
@Rule public TestName name = new TestName();
@@ -303,7 +303,7 @@ public class QueryTestCaseBase {
return executeFile(getMethodName() + ".sql");
}
- private String getMethodName() {
+ protected String getMethodName() {
String methodName = name.getMethodName();
// In the case of parameter execution name's pattern is methodName[0]
if (methodName.endsWith("]")) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 757ba0f..64c27e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -70,6 +70,7 @@ public class TajoTestingCluster {
private FileSystem defaultFS;
private MiniDFSCluster dfsCluster;
private MiniCatalogServer catalogServer;
+ private HBaseTestClusterUtil hbaseUtil;
private TajoMaster tajoMaster;
private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
@@ -284,6 +285,10 @@ public class TajoTestingCluster {
return this.defaultFS;
}
+ public HBaseTestClusterUtil getHBaseUtil() {
+ return hbaseUtil;
+ }
+
////////////////////////////////////////////////////////
// Catalog Section
////////////////////////////////////////////////////////
@@ -507,6 +512,8 @@ public class TajoTestingCluster {
startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
this.dfsCluster.waitClusterUp();
+ hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
+
if(!standbyWorkerMode) {
startMiniYarnCluster();
}
@@ -594,7 +601,6 @@ public class TajoTestingCluster {
}
if(this.dfsCluster != null) {
-
try {
FileSystem fs = this.dfsCluster.getFileSystem();
if (fs != null) fs.close();
@@ -613,6 +619,10 @@ public class TajoTestingCluster {
}
this.clusterTestBuildDir = null;
}
+
+ hbaseUtil.stopZooKeeperCluster();
+ hbaseUtil.stopHBaseCluster();
+
LOG.info("Minicluster is down");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 3437e3a..9ce7b5b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -40,10 +40,10 @@ import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
@@ -380,7 +380,7 @@ public class TestPlannerUtil {
int index = 0;
for (int i = startIndex; i < startIndex + expectedSize; i++, index++) {
- FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]);
+ FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), fragments[index]);
assertEquals(expectedFiles.get(i), fragment.getPath());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index 0e7f3e6..3803c7a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -42,10 +42,7 @@ import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.After;
import org.junit.Before;
@@ -140,8 +137,8 @@ public class TestBroadcastJoinPlan {
contentsData += j;
}
}
- Appender appender = StorageManager.getStorageManager(conf).getAppender(tableMeta, schema,
- dataPath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(tableMeta, schema, dataPath);
appender.init();
Tuple tuple = new VTuple(schema.size());
int writtenSize = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index aef8064..6a6aafb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -79,7 +79,7 @@ public class TestBNLJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerid", Type.INT4);
@@ -89,7 +89,8 @@ public class TestBNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, schema, employeePath);
appender.init();
Tuple tuple = new VTuple(schema.size());
for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -110,7 +111,8 @@ public class TestBNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -150,10 +152,10 @@ public class TestBNLJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()),
Integer.MAX_VALUE);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -162,7 +164,7 @@ public class TestBNLJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -183,9 +185,9 @@ public class TestBNLJoinExec {
LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
context).getRootBlock().getRoot();
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -201,7 +203,7 @@ public class TestBNLJoinExec {
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 60ae849..dc3c28d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -66,7 +66,7 @@ public class TestBSTIndexExec {
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private StorageManager sm;
+ private FileStorageManager sm;
private Schema idxSchema;
private BaseTupleComparator comp;
private BSTIndex.BSTIndexWriter writer;
@@ -91,7 +91,7 @@ public class TestBSTIndexExec {
Path workDir = CommonTestingUtil.getTestDir();
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = StorageManager.getStorageManager(conf, workDir);
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
idxPath = new Path(workDir, "test.idx");
@@ -117,8 +117,7 @@ public class TestBSTIndexExec {
fs = tablePath.getFileSystem(conf);
fs.mkdirs(tablePath.getParent());
- FileAppender appender = (FileAppender)StorageManager.getStorageManager(conf).getAppender(meta, schema,
- tablePath);
+ FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple = new VTuple(schema.size());
for (int i = 0; i < 10000; i++) {
@@ -164,7 +163,7 @@ public class TestBSTIndexExec {
this.rndKey = rnd.nextInt(250);
final String QUERY = "select * from employee where managerId = " + rndKey;
- FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
@@ -172,7 +171,7 @@ public class TestBSTIndexExec {
LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
LogicalNode rootNode = optimizer.optimize(plan);
- TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
+ TmpPlanner phyPlanner = new TmpPlanner(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
int tupleCount = this.randomValues.get(rndKey);
@@ -186,8 +185,8 @@ public class TestBSTIndexExec {
}
private class TmpPlanner extends PhysicalPlannerImpl {
- public TmpPlanner(TajoConf conf, StorageManager sm) {
- super(conf, sm);
+ public TmpPlanner(TajoConf conf) {
+ super(conf);
}
@Override
@@ -196,12 +195,11 @@ public class TestBSTIndexExec {
Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
"Error: There is no table matched to %s", scanNode.getTableName());
- List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(),
- ctx.getTables(scanNode.getTableName()));
+ List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
- return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
+ return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index bb40b28..c0bf6ce 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -59,7 +59,7 @@ public class TestExternalSortExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private FileStorageManager sm;
private Path testDir;
private final int numTuple = 100000;
@@ -76,7 +76,7 @@ public class TestExternalSortExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir);
Schema schema = new Schema();
schema.addColumn("managerid", Type.INT4);
@@ -85,7 +85,8 @@ public class TestExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
Tuple tuple = new VTuple(schema.size());
@@ -121,7 +122,7 @@ public class TestExternalSortExec {
@Test
public final void testNext() throws IOException, PlanningException {
- FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+ FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -131,7 +132,7 @@ public class TestExternalSortExec {
LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
ProjectionExec proj = (ProjectionExec) exec;
@@ -141,8 +142,7 @@ public class TestExternalSortExec {
UnaryPhysicalExec sortExec = proj.getChild();
SeqScanExec scan = sortExec.getChild();
- ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
- ((MemSortExec)sortExec).getPlan(), scan);
+ ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan);
proj.setChild(extSort);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index c4ce43b..ecd1c23 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -61,7 +61,7 @@ public class TestFullOuterHashJoinExec {
private CatalogService catalog;
private SQLAnalyzer analyzer;
private LogicalPlanner planner;
- private StorageManager sm;
+ private FileStorageManager sm;
private Path testDir;
private QueryContext defaultContext;
@@ -84,7 +84,7 @@ public class TestFullOuterHashJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir);
//----------------- dep3 ------------------------------
// dep_id | dep_name | loc_id
@@ -107,7 +107,8 @@ public class TestFullOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -136,7 +137,8 @@ public class TestFullOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -175,7 +177,8 @@ public class TestFullOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -227,8 +230,8 @@ public class TestFullOuterHashJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
- phone3Path);
+ Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
appender5.flush();
@@ -266,9 +269,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+ FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
@@ -277,7 +280,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -305,9 +308,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -316,7 +319,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -343,9 +346,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -354,7 +357,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -382,9 +385,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -394,7 +397,7 @@ public class TestFullOuterHashJoinExec {
workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index a82de92..a81979f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -40,10 +40,7 @@ import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -92,7 +89,7 @@ public class TestFullOuterMergeJoinExec {
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
//----------------- dep3 ------------------------------
// dep_id | dep_name | loc_id
@@ -115,7 +112,8 @@ public class TestFullOuterMergeJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -153,7 +151,8 @@ public class TestFullOuterMergeJoinExec {
TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path dep4Path = new Path(testDir, "dep4.csv");
- Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+ Appender appender4 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(dep4Meta, dep4Schema, dep4Path);
appender4.init();
Tuple tuple4 = new VTuple(dep4Schema.size());
for (int i = 0; i < 11; i++) {
@@ -184,7 +183,8 @@ public class TestFullOuterMergeJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -223,7 +223,8 @@ public class TestFullOuterMergeJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -275,8 +276,8 @@ public class TestFullOuterMergeJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
- phone3Path);
+ Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
appender5.flush();
appender5.close();
@@ -318,9 +319,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] dep3Frags =
- StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
@@ -328,7 +329,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -355,9 +356,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] job3Frags =
- StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
@@ -365,7 +366,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -392,9 +393,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] job3Frags =
- StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
@@ -402,7 +403,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -430,9 +431,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] dep4Frags =
- StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
@@ -440,7 +441,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -470,9 +471,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] phone3Frags =
- StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -481,7 +482,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -509,9 +510,9 @@ public class TestFullOuterMergeJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
FileFragment[] emp3Frags =
- StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+ FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] phone3Frags =
- StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
@@ -520,7 +521,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -537,9 +538,4 @@ public class TestFullOuterMergeJoinExec {
exec.close();
assertEquals(7, count);
}
-
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 20d4651..4fe6ff2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -76,7 +76,7 @@ public class TestHashAntiJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerid", Type.INT4);
@@ -86,8 +86,8 @@ public class TestHashAntiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
- employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeSchema.size());
@@ -112,7 +112,8 @@ public class TestHashAntiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < 10; i += 2) {
@@ -150,9 +151,9 @@ public class TestHashAntiJoinExec {
@Test
public final void testHashAntiJoin() throws IOException, PlanningException {
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -166,7 +167,7 @@ public class TestHashAntiJoinExec {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
// replace an equal join with an hash anti join.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index d1fa28a..55e87d4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -78,7 +78,7 @@ public class TestHashJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerid", Type.INT4);
@@ -88,8 +88,8 @@ public class TestHashJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
- employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeSchema.size());
for (int i = 0; i < 10; i++) {
@@ -111,7 +111,8 @@ public class TestHashJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < 10; i += 2) {
@@ -152,9 +153,9 @@ public class TestHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -163,7 +164,7 @@ public class TestHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -195,9 +196,9 @@ public class TestHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()), Integer.MAX_VALUE);
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -207,7 +208,7 @@ public class TestHashJoinExec {
ctx.setEnforcer(enforcer);
ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l);
- PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 7a43a55..a2f1155 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -38,10 +38,7 @@ import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -80,7 +77,7 @@ public class TestHashSemiJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerid", Type.INT4);
@@ -90,8 +87,8 @@ public class TestHashSemiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
- employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeSchema.size());
@@ -116,7 +113,8 @@ public class TestHashSemiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
// make 27 tuples
@@ -158,9 +156,9 @@ public class TestHashSemiJoinExec {
@Test
public final void testHashSemiJoin() throws IOException, PlanningException {
- FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
- FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
new Path(people.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -174,7 +172,7 @@ public class TestHashSemiJoinExec {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
// replace an equal join with an hash anti join.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index ec9daa7..0477771 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -85,7 +85,7 @@ public class TestLeftOuterHashJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
//----------------- dep3 ------------------------------
// dep_id | dep_name | loc_id
@@ -108,7 +108,8 @@ public class TestLeftOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -137,7 +138,8 @@ public class TestLeftOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -176,7 +178,8 @@ public class TestLeftOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -228,8 +231,8 @@ public class TestLeftOuterHashJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
- phone3Path);
+ Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
appender5.flush();
@@ -270,9 +273,9 @@ public class TestLeftOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(),
+ FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(),
new Path(dep3.getPath()), Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
@@ -281,7 +284,7 @@ public class TestLeftOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -300,9 +303,9 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
new Path(job3.getPath()), Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
new Path(emp3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -314,7 +317,7 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -341,9 +344,9 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
new Path(emp3.getPath()), Integer.MAX_VALUE);
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
new Path(job3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -355,7 +358,7 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[2]);
LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -383,9 +386,9 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
new Path(emp3.getPath()), Integer.MAX_VALUE);
- FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(),
+ FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(),
new Path(phone3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -397,7 +400,7 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[3]);
LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
@@ -425,9 +428,9 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, "default.emp3", emp3.getMeta(),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, "default.emp3", emp3.getMeta(),
new Path(emp3.getPath()), Integer.MAX_VALUE);
- FileFragment[] phone3Frags = StorageManager.splitNG(conf, "default.phone3", phone3.getMeta(),
+ FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, "default.phone3", phone3.getMeta(),
new Path(phone3.getPath()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
@@ -439,7 +442,7 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[4]);
LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
ProjectionExec proj = (ProjectionExec) exec;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index b3d1f33..36dd77e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -81,7 +81,7 @@ public class TestLeftOuterNLJoinExec {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
//----------------- dep3 ------------------------------
// dep_id | dep_name | loc_id
@@ -104,7 +104,8 @@ public class TestLeftOuterNLJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
Tuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -133,7 +134,8 @@ public class TestLeftOuterNLJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
Tuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -172,7 +174,8 @@ public class TestLeftOuterNLJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -224,8 +227,8 @@ public class TestLeftOuterNLJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
- phone3Path);
+ Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
appender5.flush();
@@ -254,9 +257,9 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
- FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+ FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
@@ -269,7 +272,7 @@ public class TestLeftOuterNLJoinExec {
LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -295,9 +298,9 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -311,7 +314,7 @@ public class TestLeftOuterNLJoinExec {
LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -340,9 +343,9 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -355,7 +358,7 @@ public class TestLeftOuterNLJoinExec {
LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -385,9 +388,9 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -400,7 +403,7 @@ public class TestLeftOuterNLJoinExec {
LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -429,9 +432,9 @@ public class TestLeftOuterNLJoinExec {
@Test
public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
- FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
Integer.MAX_VALUE);
- FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
@@ -444,7 +447,7 @@ public class TestLeftOuterNLJoinExec {
LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -470,7 +473,4 @@ public class TestLeftOuterNLJoinExec {
exec.close();
assertEquals(0, count);
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index cae5de5..10d4d33 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -79,7 +79,7 @@ public class TestMergeJoinExec {
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
conf = util.getConfiguration();
FileSystem fs = testDir.getFileSystem(conf);
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
Schema employeeSchema = new Schema();
employeeSchema.addColumn("managerid", Type.INT4);
@@ -89,8 +89,8 @@ public class TestMergeJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
- employeePath);
+ Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
Tuple tuple = new VTuple(employeeSchema.size());
for (int i = 0; i < 10; i++) {
@@ -118,7 +118,8 @@ public class TestMergeJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
Path peoplePath = new Path(testDir, "people.csv");
- appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+ .getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < 10; i += 2) {
@@ -165,9 +166,9 @@ public class TestMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- FileFragment[] empFrags = sm.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()),
+ FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()),
Integer.MAX_VALUE);
- FileFragment[] peopleFrags = sm.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()),
+ FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -176,7 +177,7 @@ public class TestMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
PhysicalExec exec = phyPlanner.createPlan(ctx, root);
ProjectionExec proj = (ProjectionExec) exec;
assertTrue(proj.getChild() instanceof MergeJoinExec);