You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/25 02:55:50 UTC
[3/3] tajo git commit: TAJO-1616: Implement TablespaceManager to load
Tablespaces. (missed commits)
TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/90afaa46
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/90afaa46
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/90afaa46
Branch: refs/heads/master
Commit: 90afaa468080d4f743ed2eee8326a38995900807
Parents: d440727
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jun 24 17:55:10 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jun 24 17:55:10 2015 -0700
----------------------------------------------------------------------
.travis.yml | 4 +-
.../apache/tajo/storage/StorageConstants.java | 3 +
.../java/org/apache/tajo/util/KeyValueSet.java | 8 +-
.../engine/planner/PhysicalPlannerImpl.java | 6 +-
.../planner/physical/ColPartitionStoreExec.java | 2 +-
.../engine/planner/physical/InsertRowsExec.java | 107 +++++
.../physical/RangeShuffleFileWriteExec.java | 2 +-
.../engine/planner/physical/SeqScanExec.java | 2 +-
.../engine/planner/physical/StoreTableExec.java | 10 +-
.../apache/tajo/engine/query/QueryContext.java | 4 +-
.../org/apache/tajo/master/GlobalEngine.java | 10 +-
.../apache/tajo/master/exec/DDLExecutor.java | 10 +-
.../exec/NonForwardQueryResultFileScanner.java | 2 +-
.../apache/tajo/master/exec/QueryExecutor.java | 179 +++++----
.../master/exec/prehook/CreateTableHook.java | 2 -
.../java/org/apache/tajo/querymaster/Query.java | 6 +-
.../tajo/querymaster/QueryMasterTask.java | 118 +-----
.../apache/tajo/querymaster/Repartitioner.java | 10 +-
.../java/org/apache/tajo/querymaster/Stage.java | 4 +-
.../org/apache/tajo/worker/LegacyTaskImpl.java | 2 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 2 +-
.../src/main/resources/webapps/admin/index.jsp | 4 +-
.../org/apache/tajo/BackendTestingUtil.java | 2 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 35 ++
.../org/apache/tajo/TajoTestingCluster.java | 18 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +-
.../apache/tajo/engine/eval/ExprTestBase.java | 5 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 5 +-
.../engine/planner/TestLogicalOptimizer.java | 4 +-
.../tajo/engine/planner/TestLogicalPlan.java | 4 +-
.../tajo/engine/planner/TestLogicalPlanner.java | 4 +-
.../tajo/engine/planner/TestPlannerUtil.java | 4 +-
.../planner/physical/TestBNLJoinExec.java | 6 +-
.../planner/physical/TestBSTIndexExec.java | 4 +-
.../planner/physical/TestExternalSortExec.java | 4 +-
.../physical/TestFullOuterHashJoinExec.java | 10 +-
.../physical/TestFullOuterMergeJoinExec.java | 12 +-
.../planner/physical/TestHashAntiJoinExec.java | 6 +-
.../planner/physical/TestHashJoinExec.java | 6 +-
.../planner/physical/TestHashSemiJoinExec.java | 6 +-
.../physical/TestLeftOuterHashJoinExec.java | 10 +-
.../planner/physical/TestMergeJoinExec.java | 6 +-
.../engine/planner/physical/TestNLJoinExec.java | 6 +-
.../planner/physical/TestPhysicalPlanner.java | 12 +-
.../physical/TestProgressExternalSortExec.java | 4 +-
.../physical/TestRightOuterHashJoinExec.java | 8 +-
.../physical/TestRightOuterMergeJoinExec.java | 12 +-
.../engine/planner/physical/TestSortExec.java | 6 +-
.../tajo/engine/query/TestHBaseTable.java | 62 ++-
.../apache/tajo/engine/query/TestJoinQuery.java | 2 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 88 +++--
.../org/apache/tajo/jdbc/TestResultSet.java | 2 +-
.../tajo/master/TestExecutionBlockCursor.java | 4 +-
.../TestNonForwardQueryResultSystemScanner.java | 258 +-----------
.../apache/tajo/querymaster/TestKillQuery.java | 6 +-
.../org/apache/tajo/storage/TestRowFile.java | 2 +-
.../TestHBaseTable/testInsertValues1.result | 4 +
.../testGetClusterDetails.result | 4 +
.../testGetNextRowsForAggregateFunction.result | 3 +
.../testGetNextRowsForTable.result | 5 +
.../java/org/apache/tajo/plan/LogicalPlan.java | 8 +
.../org/apache/tajo/plan/util/PlannerUtil.java | 16 +
.../org/apache/tajo/storage/FormatProperty.java | 42 +-
.../org/apache/tajo/storage/MergeScanner.java | 4 +-
.../apache/tajo/storage/OldStorageManager.java | 3 +-
.../apache/tajo/storage/StorageProperty.java | 42 +-
.../apache/tajo/storage/TableSpaceManager.java | 390 -------------------
.../org/apache/tajo/storage/Tablespace.java | 19 +-
.../apache/tajo/storage/TablespaceManager.java | 390 +++++++++++++++++++
.../tajo/storage/hbase/HBasePutAppender.java | 4 +-
.../apache/tajo/storage/hbase/HBaseScanner.java | 4 +-
.../tajo/storage/hbase/HBaseTablespace.java | 48 ++-
.../tajo/storage/hbase/TestHBaseTableSpace.java | 10 +-
.../org/apache/tajo/storage/FileAppender.java | 2 +-
.../org/apache/tajo/storage/FileTablespace.java | 138 +++++--
.../storage/HashShuffleAppenderManager.java | 2 +-
.../tajo/storage/TestCompressionStorages.java | 4 +-
.../tajo/storage/TestDelimitedTextFile.java | 8 +-
.../apache/tajo/storage/TestFileSystems.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 12 +-
.../org/apache/tajo/storage/TestLineReader.java | 8 +-
.../apache/tajo/storage/TestMergeScanner.java | 6 +-
.../org/apache/tajo/storage/TestStorages.java | 44 +--
.../apache/tajo/storage/index/TestBSTIndex.java | 20 +-
.../index/TestSingleCSVFileBSTIndex.java | 4 +-
.../apache/tajo/storage/json/TestJsonSerDe.java | 2 +-
86 files changed, 1236 insertions(+), 1136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 8fc9b94..61d56fb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,7 +19,7 @@ git:
depth: 150
jdk:
- - openjdk7
+ - oraclejdk7
env: PATH=$PATH:$HOME/local/bin
@@ -33,7 +33,7 @@ notifications:
- issues@tajo.apache.org
irc: "chat.freenode.net#tajo"
-
+before_install: ulimit -t 514029
install: ./dev-support/travis-install-dependencies.sh
script:
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index a9923a5..16cf51d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -30,6 +30,9 @@ public class StorageConstants {
// Common table properties -------------------------------------------------
+ // Insert
+ public static final String INSERT_DIRECTLY = "insert.direct";
+
// time zone
public static final String TIMEZONE = "timezone";
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 0e27769..404606d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -115,6 +115,10 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
return get(key, null);
}
+ public boolean isTrue(String key) {
+ return getBool(key, false);
+ }
+
public void setBool(String key, boolean val) {
set(key, val ? TRUE_STR : FALSE_STR);
}
@@ -137,9 +141,9 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
public boolean getBool(ConfigKey key) {
String keyName = key.keyname();
if (key instanceof SessionVars) {
- return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal);
+ return getBool(keyName, ((SessionVars) key).getConfVars().defaultBoolVal);
} else if (key instanceof TajoConf.ConfVars) {
- return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal);
+ return getBool(keyName, ((TajoConf.ConfVars) key).defaultBoolVal);
}
return getBool(keyName);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index f0b2f5e..c6b9b41 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -252,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
FragmentProto[] fragmentProtos = ctx.getTables(tableId);
List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
for (Fragment frag : fragments) {
- size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag);
+ size += TablespaceManager.guessFragmentVolume(ctx.getConf(), frag);
}
}
return size;
@@ -926,7 +926,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = TUtil.newList();
- FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get();
+ FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()).get();
for (Path path : partitionedTableScanNode.getInputPaths()) {
fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path)));
}
@@ -1190,7 +1190,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
- FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get();
+ FileTablespace sm = (FileTablespace) TablespaceManager.get(fragments.get(0).getPath().toUri()).get();
String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 969998c..76abc6d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
actualFilePath = new Path(lastFileName + "_" + suffixId);
}
- appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get())
+ appender = ((FileTablespace) TablespaceManager.get(lastFileName.toUri()).get())
.getAppender(meta, outSchema, actualFilePath);
appender.enableStats();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
new file mode 100644
index 0000000..f3a24a7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.plan.logical.PersistentStoreNode;
+import org.apache.tajo.plan.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * This is a physical executor to store rows immediately.
+ */
+public class InsertRowsExec extends UnaryPhysicalExec {
+ private static final Log LOG = LogFactory.getLog(InsertRowsExec.class);
+
+ private PersistentStoreNode plan;
+ private TableMeta meta;
+ private Appender appender;
+ private Tuple tuple;
+
+ // for file punctuation
+ private TableStats sumStats; // for aggregating all stats of written files
+
+ public InsertRowsExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ this.plan = plan;
+ }
+
+ public void init() throws IOException {
+ super.init();
+
+ if (plan.hasOptions()) {
+ meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+ } else {
+ meta = CatalogUtil.newTableMeta(plan.getStorageType());
+ }
+
+ PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta);
+ sumStats = new TableStats();
+
+ StoreTableNode storeTableNode = (StoreTableNode) plan;
+ appender = TablespaceManager.get(storeTableNode.getUri()).get().getAppenderForInsertRow(
+ context.getQueryContext(),
+ context.getTaskId(), meta, storeTableNode.getTableSchema(), context.getOutputPath());
+ appender.enableStats();
+ appender.init();
+ }
+
+ /* (non-Javadoc)
+ * @see PhysicalExec#next()
+ */
+ @Override
+ public Tuple next() throws IOException {
+ while((tuple = child.next()) != null) {
+ appender.addTuple(tuple);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ // nothing to do
+ }
+
+ public void close() throws IOException {
+ super.close();
+
+ if(appender != null){
+ appender.flush();
+ appender.close();
+
+ // Collect statistics data
+ StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+ context.setResultStats(sumStats);
+ }
+
+ appender = null;
+ plan = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index fb29e4f..bbb21fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
context.getDataChannel().getStoreType() : "RAW");
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault())
+ this.appender = (FileAppender) ((FileTablespace) TablespaceManager.getDefault())
.getAppender(meta, outSchema, new Path(storeTablePath, "output"));
this.appender.enableStats();
this.appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index d2ae3bd..599f160 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -202,7 +202,7 @@ public class SeqScanExec extends ScanExec {
FragmentConvertor.convert(context.getConf(), fragments), projected
);
} else {
- Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get();
+ Tablespace tablespace = TablespaceManager.get(plan.getTableDesc().getUri()).get();
this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index dd8768e..6031fdb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -31,12 +31,14 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.plan.logical.InsertNode;
import org.apache.tajo.plan.logical.PersistentStoreNode;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.net.URI;
/**
* This is a physical executor to store a table part into a specified storage.
@@ -92,7 +94,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
lastFileName = new Path(lastFileName + "_" + suffixId);
}
- Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri());
+ Optional<FileTablespace> spaceRes = TablespaceManager.get(lastFileName.toUri());
if (!spaceRes.isPresent()) {
throw new IllegalStateException("No Tablespace for " + lastFileName.toUri());
}
@@ -106,7 +108,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
}
} else {
Path stagingDir = context.getQueryContext().getStagingDir();
- appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender(
+ appender = TablespaceManager.get(stagingDir.toUri()).get().getAppender(
context.getQueryContext(),
context.getTaskId(),
meta,
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 7696c6c..da2f2ad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -99,8 +99,8 @@ public class QueryContext extends OverridableConf {
return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
}
- public void setStagingDir(Path path) {
- put(QueryVars.STAGING_DIR, path.toUri().toString());
+ public void setStagingDir(URI uri) {
+ put(QueryVars.STAGING_DIR, uri.toString());
}
public Path getStagingDir() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index e833884..37b497c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -53,7 +53,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import java.io.IOException;
@@ -91,7 +91,7 @@ public class GlobalEngine extends AbstractService {
try {
analyzer = new SQLAnalyzer();
preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
- planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(context.getConf());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
} catch (Throwable t) {
@@ -141,8 +141,8 @@ public class GlobalEngine extends AbstractService {
QueryContext newQueryContext = new QueryContext(context.getConf(), session);
// Set default space uri and its root uri
- newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri());
- newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri());
+ newQueryContext.setDefaultSpaceUri(TablespaceManager.getDefault().getUri());
+ newQueryContext.setDefaultSpaceRootUri(TablespaceManager.getDefault().getRootUri());
String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
@@ -303,7 +303,7 @@ public class GlobalEngine extends AbstractService {
InsertNode iNode = rootNode.getChild();
Schema outSchema = iNode.getChild().getOutSchema();
- TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
+ TablespaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
} catch (Throwable t) {
state.addVerification(t.getMessage());
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 5e0e639..7104412 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -36,7 +36,7 @@ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.StorageUtil;
@@ -251,11 +251,11 @@ public class DDLExecutor {
Tablespace tableSpace;
if (tableSpaceName != null) {
- tableSpace = TableSpaceManager.getByName(tableSpaceName).get();
+ tableSpace = TablespaceManager.getByName(tableSpaceName).get();
} else if (uri != null) {
- tableSpace = TableSpaceManager.get(uri).get();
+ tableSpace = TablespaceManager.get(uri).get();
} else {
- tableSpace = TableSpaceManager.getDefault();
+ tableSpace = TablespaceManager.getDefault();
}
TableDesc desc;
@@ -313,7 +313,7 @@ public class DDLExecutor {
if (purge) {
try {
- TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
+ TablespaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
} catch (IOException e) {
throw new InternalError(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index ae57453..ec8760f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
}
private void initSeqScanExec() throws IOException {
- Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get();
+ Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get();
List<Fragment> fragments = null;
setPartition(tablespace);
fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 480f45c..5d42157 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@ -39,7 +40,7 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.StoreTableExec;
+import org.apache.tajo.engine.planner.physical.InsertRowsExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
@@ -60,14 +61,13 @@ import org.apache.tajo.plan.function.python.TajoScriptEngine;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.querymaster.Query;
-import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -288,7 +288,7 @@ public class QueryExecutor {
boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
if (isInsert) {
InsertNode insertNode = rootNode.getChild();
- insertNonFromQuery(queryContext, insertNode, responseBuilder);
+ insertRowValues(queryContext, insertNode, responseBuilder);
} else {
Schema schema = PlannerUtil.targetToSchema(targets);
RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
@@ -330,89 +330,123 @@ public class QueryExecutor {
}
}
- private void insertNonFromQuery(QueryContext queryContext,
- InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
+ /**
+ * Insert rows through staging phase
+ */
+ private void insertRowsThroughStaging(TaskAttemptContext taskAttemptContext,
+ InsertNode insertNode,
+ Path finalOutputPath,
+ Path stagingDir,
+ Path stagingResultDir)
+ throws IOException {
+
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
+
+ try {
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
+ }
+
+ FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+
+ if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+ // it moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ try {
+ if (fs.exists(finalOutputPath)) {
+ fs.rename(finalOutputPath, oldTableDir);
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputPath.getParent());
+ }
+ fs.rename(stagingResultDir, finalOutputPath);
+ committed = fs.exists(finalOutputPath);
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+ fs.rename(oldTableDir, finalOutputPath);
+ }
+ }
+ } else {
+ FileStatus[] files = fs.listStatus(stagingResultDir);
+ for (FileStatus eachFile : files) {
+ Path targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName());
+ if (fs.exists(targetFilePath)) {
+ targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+ }
+ fs.rename(eachFile.getPath(), targetFilePath);
+ }
+ }
+ }
+
+ /**
+ * Insert row values
+ */
+ private void insertRowValues(QueryContext queryContext,
+ InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
try {
String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
insertNode.getTableName();
String queryId = nodeUniqName + "_" + System.currentTimeMillis();
- FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
- Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ URI finalOutputUri = insertNode.getUri();
+ Tablespace space = TablespaceManager.get(finalOutputUri).get();
+ TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
+ tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());
- TableDesc tableDesc = null;
- Path finalOutputDir;
- if (insertNode.getTableName() != null) {
- tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
- finalOutputDir = new Path(tableDesc.getUri());
- } else {
- finalOutputDir = new Path(insertNode.getUri());
- }
+ FormatProperty formatProperty = space.getFormatProperty(tableMeta);
- TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
- taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+ TaskAttemptContext taskAttemptContext;
+ if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
+ taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
+ taskAttemptContext.setOutputPath(new Path(finalOutputUri));
- EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
- StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
- try {
- exec.init();
- exec.next();
- } finally {
- exec.close();
- }
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
- if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
- // it moves the original table into the temporary location.
- // Then it moves the new result table into the original table location.
- // Upon failed, it recovers the original table if possible.
- boolean movedToOldTable = false;
- boolean committed = false;
- Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
try {
- if (fs.exists(finalOutputDir)) {
- fs.rename(finalOutputDir, oldTableDir);
- movedToOldTable = fs.exists(oldTableDir);
- } else { // if the parent does not exist, make its parent directory.
- fs.mkdirs(finalOutputDir.getParent());
- }
- fs.rename(stagingResultDir, finalOutputDir);
- committed = fs.exists(finalOutputDir);
- } catch (IOException ioe) {
- // recover the old table
- if (movedToOldTable && !committed) {
- fs.rename(oldTableDir, finalOutputDir);
- }
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
}
} else {
- FileStatus[] files = fs.listStatus(stagingResultDir);
- for (FileStatus eachFile : files) {
- Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
- if (fs.exists(targetFilePath)) {
- targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
- }
- fs.rename(eachFile.getPath(), targetFilePath);
- }
+ URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
+ Path stagingDir = new Path(stagingSpaceUri);
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+ taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+ taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+ insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
}
- if (insertNode.hasTargetTable()) {
- TableStats stats = tableDesc.getStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
+ // set insert stats (how many rows and bytes)
+ TableStats stats = new TableStats();
+ stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
+ stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());
+ if (insertNode.hasTargetTable()) {
CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
- builder.setTableName(tableDesc.getName());
+ builder.setTableName(insertNode.getTableName());
builder.setStats(stats.getProto());
catalog.updateTableStats(builder.build());
- responseBuilder.setTableDesc(tableDesc.getProto());
- } else {
- TableStats stats = new TableStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
+ TableDesc desc = new TableDesc(
+ insertNode.getTableName(),
+ insertNode.getTargetSchema(),
+ tableMeta,
+ finalOutputUri);
+ responseBuilder.setTableDesc(desc.getProto());
+
+ } else { // If INSERT INTO LOCATION
// Empty TableDesc
List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
@@ -445,11 +479,12 @@ public class QueryExecutor {
TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
if (tableDesc != null) {
- Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
- StorageProperty storageProperty = space.getProperty();
+ Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
+ FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta());
- if (!storageProperty.isInsertable()) {
- throw new VerifyException("Inserting into non-file storage is not supported.");
+ if (!formatProperty.isInsertable()) {
+ throw new VerifyException(
+ String.format("%s tablespace does not allow INSERT operation.", tableDesc.getUri().toString()));
}
space.prepareTable(rootNode.getChild());
@@ -487,7 +522,7 @@ public class QueryExecutor {
TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
if (tableDesc != null) {
- Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
space.rewritePlan(context, plan);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
index d490001..0c02b6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -24,8 +24,6 @@ import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.CreateTableNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.storage.TableSpaceManager;
-import org.apache.tajo.storage.Tablespace;
public class CreateTableHook implements DistributedQueryHook {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 4fef02c..9d5838d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -48,7 +48,7 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.QueryHistory;
@@ -447,7 +447,7 @@ public class Query implements EventHandler<QueryEvent> {
QueryContext context = query.context.getQueryContext();
if (lastStage != null && context.hasOutputTableUri()) {
- Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get();
+ Tablespace space = TablespaceManager.get(context.getOutputTableUri()).get();
try {
LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
space.rollbackTable(rootNode.getChild());
@@ -470,7 +470,7 @@ public class Query implements EventHandler<QueryEvent> {
// If there is not tabledesc, it is a select query without insert or ctas.
// In this case, we should use default tablespace.
- Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+ Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
Path finalOutputDir = space.commitTable(
query.context.getQueryContext(),
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 84f2eac..1f5e7a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -18,15 +18,10 @@
package org.apache.tajo.querymaster;
-import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -61,6 +56,7 @@ import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import java.io.IOException;
+import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -70,12 +66,6 @@ import static org.apache.tajo.TajoProtos.QueryState;
public class QueryMasterTask extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
- // query submission directory is private!
- final public static FsPermission STAGING_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
-
- public static final String TMP_STAGING_DIR_PREFIX = ".staging";
-
private QueryId queryId;
private Session session;
@@ -157,8 +147,6 @@ public class QueryMasterTask extends CompositeService {
dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
- initStagingDir();
-
queryMetrics = new TajoMetrics(queryId.toString());
super.init(systemConf);
@@ -303,8 +291,9 @@ public class QueryMasterTask extends CompositeService {
state == QueryState.QUERY_ERROR;
}
+ private LogicalPlan plan;
+
public synchronized void startQuery() {
- LogicalPlan plan = null;
Tablespace space = null;
try {
if (query != null) {
@@ -314,7 +303,7 @@ public class QueryMasterTask extends CompositeService {
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
@@ -322,10 +311,12 @@ public class QueryMasterTask extends CompositeService {
plan = planner.createPlan(queryContext, expr);
optimizer.optimize(queryContext, plan);
- // when a given uri is null, TableSpaceManager.get will return the default tablespace.
- space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+ // when a given uri is null, TablespaceManager.get will return the default tablespace.
+ space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
space.rewritePlan(queryContext, plan);
+ initStagingDir();
+
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
if (scanNodes != null) {
@@ -367,94 +358,25 @@ public class QueryMasterTask extends CompositeService {
}
private void initStagingDir() throws IOException {
- Path stagingDir;
+ URI stagingDir;
try {
+ Tablespace tablespace = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+ TableDesc desc = PlannerUtil.getOutputTableDesc(plan);
- stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
+ FormatProperty formatProperty = tablespace.getFormatProperty(desc.getMeta());
+ if (formatProperty.isStagingSupport()) {
+ stagingDir = tablespace.prepareStagingSpace(systemConf, queryId.toString(), queryContext, desc.getMeta());
- // Create a subdirectories
- LOG.info("The staging dir '" + stagingDir + "' is created.");
- queryContext.setStagingDir(stagingDir);
- } catch (IOException ioe) {
- LOG.warn("Creating staging dir has been failed.", ioe);
-
- throw ioe;
- }
- }
-
- /**
- * It initializes the final output and staging directory and sets
- * them to variables.
- */
- public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
-
- String realUser;
- String currentUser;
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getLoginUser();
- realUser = ugi.getShortUserName();
- currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
- FileSystem fs;
- Path stagingDir;
-
- ////////////////////////////////////////////
- // Create Output Directory
- ////////////////////////////////////////////
-
- String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
-
- // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
- // So, this query results won't be materialized as a part of a table.
- // The result will be temporarily written in the staging directory.
- if (outputPath.isEmpty()) {
- // for temporarily written in the storage directory
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
- } else {
- Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath);
- if (!spaceResult.isPresent()) {
- throw new IOException("No registered Tablespace for " + outputPath);
+ // Create a staging space
+ LOG.info("The staging dir '" + stagingDir + "' is created.");
+ queryContext.setStagingDir(stagingDir);
}
- Tablespace space = spaceResult.get();
- if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
- // If this space allows move operation, the staging directory will be underneath the final output table uri.
- stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId);
- } else {
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
- }
- }
-
- // initializ
- fs = stagingDir.getFileSystem(conf);
-
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory '" + stagingDir + "' already exists");
- }
- fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
- FileStatus fsStatus = fs.getFileStatus(stagingDir);
- String owner = fsStatus.getOwner();
-
- if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + stagingDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
- }
-
- if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + stagingDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + STAGING_DIR_PERMISSION);
- fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ } catch (IOException ioe) {
+ LOG.warn("Creating staging space has been failed.", ioe);
+ throw ioe;
}
-
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- fs.mkdirs(stagingResultDir);
-
- return stagingDir;
}
public Query getQuery() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 5b8f24a..f30fb64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -100,7 +100,7 @@ public class Repartitioner {
stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
}
- // TODO - We should remove dummy flagment usages
+ // TODO - We should remove dummy fragment usages
fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0,
new String[]{UNKNOWN_HOST});
@@ -115,7 +115,7 @@ public class Repartitioner {
// if table has no data, tablespace will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
- Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc);
if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
@@ -380,7 +380,7 @@ public class Repartitioner {
Path[] partitionScanPaths = null;
TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
- Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
@@ -507,7 +507,7 @@ public class Repartitioner {
Collection<Fragment> scanFragments;
Path[] partitionScanPaths = null;
- FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get();
+ FileTablespace space = (FileTablespace) TablespaceManager.get(desc.getUri()).get();
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
@@ -645,7 +645,7 @@ public class Repartitioner {
PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
}
- Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get();
+ Tablespace space = TablespaceManager.getAnyByScheme(storeType).get();
ranges = space.getInsertSortRanges(
stage.getContext().getQueryContext(),
tableDesc,
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index a7d605c..1163a6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -60,7 +60,7 @@ import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
@@ -1084,7 +1084,7 @@ public class Stage implements EventHandler<StageEvent> {
Collection<Fragment> fragments;
TableMeta meta = table.getMeta();
- Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get();
+ Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get();
// Depending on scanner node's type, it creates fragments. If scan is for
// a partitioned table, It will creates lots fragments for all partitions.
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
index 0df5d4d..f97ce29 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task {
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
+ Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get())
.getAppenderFilePath(getId(), queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 5974693..7697458 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -154,7 +154,7 @@ public class TaskImpl implements Task {
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
+ Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get())
.getAppenderFilePath(getId(), queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 43bb6c1..bd84283 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -27,7 +27,7 @@
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.storage.TableSpaceManager" %>
+<%@ page import="org.apache.tajo.storage.TablespaceManager" %>
<%@ page import="org.apache.tajo.storage.Tablespace" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
<%@ page import="org.apache.tajo.util.TUtil" %>
@@ -141,7 +141,7 @@
<h3>Tablespaces</h3>
<table width="100%" class="border_table" border="1">
<tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr>
- <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) {
+ <% for (Tablespace space : TablespaceManager.getAllTablespaces()) {
if (space.isVisible()) { %>
<tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr>
<% }}%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index ca2378b..5df1122 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -46,7 +46,7 @@ public class BackendTestingUtil {
public static void writeTmpTable(TajoConf conf, Path tablePath)
throws IOException {
- FileTablespace sm = TableSpaceManager.getDefault();
+ FileTablespace sm = TablespaceManager.getDefault();
Appender appender;
Path filePath = new Path(tablePath, "table.csv");
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 57b1e18..a323f25 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -348,6 +348,41 @@ public class QueryTestCaseBase {
}
/**
+ * It executes the query file and compare the result against the the result file.
+ *
+ * @throws Exception
+ */
+ public void assertQuery() throws Exception {
+ ResultSet res = null;
+ try {
+ res = executeQuery();
+ assertResultSet(res);
+ } finally {
+ if (res != null) {
+ res.close();
+ }
+ }
+ }
+
+ /**
+ * It executes a given query statement and verifies the result against the the result file.
+ *
+ * @param query A query statement
+ * @throws Exception
+ */
+ public void assertQueryStr(String query) throws Exception {
+ ResultSet res = null;
+ try {
+ res = executeString(query);
+ assertResultSet(res);
+ } finally {
+ if (res != null) {
+ res.close();
+ }
+ }
+ }
+
+ /**
* Execute a query contained in the file located in src/test/resources/results/<i>ClassName</i>/<i>MethodName</i>.
* <i>ClassName</i> and <i>MethodName</i> will be replaced by actual executed class and methods.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index acdae85..973f1e8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -48,7 +48,7 @@ import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
@@ -59,6 +59,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
@@ -346,18 +347,17 @@ public class TajoTestingCluster {
LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
if (!local) {
- c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
- } else {
- c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
- }
+ String tajoRootDir = getMiniDFSCluster().getFileSystem().getUri().toString() + "/tajo";
+ c.setVar(ConfVars.ROOT_DIR, tajoRootDir);
- // Do not need for local file system
- if (!local) {
+ URI defaultTsUri = TajoConf.getWarehouseDir(c).toUri();
FileTablespace defaultTableSpace =
- new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri());
+ new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri);
defaultTableSpace.init(conf);
+ TablespaceManager.addTableSpaceForTest(defaultTableSpace);
- TableSpaceManager.addTableSpaceForTest(defaultTableSpace);
+ } else {
+ c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
}
setupCatalogForTesting(c, testBuildDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index ce951c6..26e25a4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -32,7 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.FileUtil;
import org.junit.After;
import org.junit.Before;
@@ -217,7 +217,7 @@ public class TestTajoCli {
if (!cluster.isHiveCatalogStoreRunning()) {
assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"},
- new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()});
+ new String[]{TablespaceManager.getDefault().getTableUri("default", tableName).toString()});
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 328f883..07a09ad 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -24,7 +24,6 @@ import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.cli.tsql.InvalidStatementException;
import org.apache.tajo.cli.tsql.ParsedResult;
import org.apache.tajo.cli.tsql.SimpleParser;
@@ -50,7 +49,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.storage.LazyTuple;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.BytesUtils;
@@ -104,7 +103,7 @@ public class ExprTestBase {
analyzer = new SQLAnalyzer();
preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
- planner = new LogicalPlanner(cat, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(util.getConfiguration());
annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 80f3459..5a8238c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -27,7 +27,6 @@ import org.apache.tajo.algebra.OpType;
import org.apache.tajo.algebra.Selection;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
@@ -45,7 +44,7 @@ import org.apache.tajo.plan.function.GeneralFunction;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.nameresolver.NameResolvingMode;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
@@ -117,7 +116,7 @@ public class TestEvalTreeUtil {
catalog.createFunction(funcMeta);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
String[] QUERIES = {
"select name, score, age from people where score > 30", // 0
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 9aa7ddf..afa3472 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -34,7 +34,7 @@ import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.AfterClass;
@@ -103,7 +103,7 @@ public class TestLogicalOptimizer {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(util.getConfiguration());
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 3cee816..dc9e2b0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -19,7 +19,7 @@
package org.apache.tajo.engine.planner;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.graph.SimpleDirectedGraph;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -40,7 +40,7 @@ public class TestLogicalPlan {
public static void setup() throws Exception {
util = new TajoTestingCluster();
util.startCatalogCluster();
- planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TablespaceManager.getInstance());
}
public static void tearDown() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 351a6af..0f37763 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -42,7 +42,7 @@ import org.apache.tajo.plan.*;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
@@ -131,7 +131,7 @@ public class TestLogicalPlanner {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index d62eed2..fb35220 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -39,7 +39,7 @@ import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
@@ -109,7 +109,7 @@ public class TestPlannerUtil {
catalog.createFunction(funcDesc);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 2464fb1..ace3d0d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -86,7 +86,7 @@ public class TestBNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.init();
VTuple tuple = new VTuple(schema.size());
@@ -108,7 +108,7 @@ public class TestBNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -124,7 +124,7 @@ public class TestBNLJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 96a1f36..b4a6063 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -90,7 +90,7 @@ public class TestBSTIndexExec {
Path workDir = CommonTestingUtil.getTestDir();
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = TableSpaceManager.getLocalFs();
+ sm = TablespaceManager.getLocalFs();
idxPath = new Path(workDir, "test.idx");
@@ -148,7 +148,7 @@ public class TestBSTIndexExec {
catalog.createTable(desc);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index d94d3f6..cf5220e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -82,7 +82,7 @@ public class TestExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
@@ -104,7 +104,7 @@ public class TestExternalSortExec {
employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
catalog.createTable(employee);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
}
@After
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index 21a101a..dc4dd04 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -104,7 +104,7 @@ public class TestFullOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -133,7 +133,7 @@ public class TestFullOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -172,7 +172,7 @@ public class TestFullOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -224,7 +224,7 @@ public class TestFullOuterHashJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
@@ -234,7 +234,7 @@ public class TestFullOuterHashJoinExec {
catalog.createTable(phone3);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 0e2ce42..8fd61d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -109,7 +109,7 @@ public class TestFullOuterMergeJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -147,7 +147,7 @@ public class TestFullOuterMergeJoinExec {
TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
Path dep4Path = new Path(testDir, "dep4.csv");
- Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path);
+ Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path);
appender4.init();
VTuple tuple4 = new VTuple(dep4Schema.size());
for (int i = 0; i < 11; i++) {
@@ -178,7 +178,7 @@ public class TestFullOuterMergeJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -217,7 +217,7 @@ public class TestFullOuterMergeJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -269,7 +269,7 @@ public class TestFullOuterMergeJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
appender5.flush();
@@ -278,7 +278,7 @@ public class TestFullOuterMergeJoinExec {
catalog.createTable(phone3);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index d54df1c..1b64a8f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -83,7 +83,7 @@ public class TestHashAntiJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
VTuple tuple = new VTuple(employeeSchema.size());
@@ -109,7 +109,7 @@ public class TestHashAntiJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
@@ -128,7 +128,7 @@ public class TestHashAntiJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index a8826ee..b9ee06a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -85,7 +85,7 @@ public class TestHashJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, employeeSchema, employeePath);
appender.init();
VTuple tuple = new VTuple(employeeSchema.size());
@@ -108,7 +108,7 @@ public class TestHashJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+ appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
@@ -126,7 +126,7 @@ public class TestHashJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+ planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}