You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/10/01 08:38:29 UTC
git commit: TAJO-1083: StoreTableExec should be block iterative.
Repository: tajo
Updated Branches:
refs/heads/block_iteration c128eca31 -> 292f67438
TAJO-1083: StoreTableExec should be block iterative.
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/292f6743
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/292f6743
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/292f6743
Branch: refs/heads/block_iteration
Commit: 292f6743845139dfc4ae5d7d672386b25202beed
Parents: c128eca
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Sep 28 10:42:20 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Sep 28 10:42:20 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tajo/SessionVars.java | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../org/apache/tajo/engine/eval/EvalNode.java | 7 +
.../apache/tajo/engine/planner/Projector.java | 23 +-
.../engine/planner/physical/PhysicalExec.java | 2 +-
.../engine/planner/physical/SeqScanExec.java | 24 +++
.../engine/planner/physical/StoreTableExec.java | 28 +++
.../tajo/engine/utils/TupleBuilderUtil.java | 102 +++++++++
.../org/apache/tajo/master/GlobalEngine.java | 20 ++
.../main/java/org/apache/tajo/worker/Task.java | 14 +-
.../physical/block/TestBlockIteratorExec.java | 211 +++++++++++++++++++
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../tajo/tuple/offheap/OffHeapRowBlock.java | 6 +-
.../tuple/offheap/OffHeapRowBlockWriter.java | 2 +-
.../tajo/tuple/offheap/ResizableLimitSpec.java | 2 +-
.../apache/tajo/storage/TestNextFetches.java | 23 +-
16 files changed, 449 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index cc875b2..b63d4f4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -99,6 +99,8 @@ public enum SessionVars implements ConfigKey {
"shuffle output size for partition table write (mb)", DEFAULT),
// for physical Executors
+ EXEC_ENGINE(ConfVars.$EXECUTOR_ENGINE,
+ "executor engine types that queries will use. Types: volcano and block (default is volcano)", DEFAULT),
EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),
INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD,
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b5a9b50..b1229eb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -317,6 +317,7 @@ public class TajoConf extends Configuration {
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),
// for physical Executors
+ $EXECUTOR_ENGINE("tajo.executor.engine", "volcano"), // volcano, and block
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
(long)256 * 1048576),
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
index 754f888..b487001 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
@@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.utils.TupleBuilderUtil;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
/**
* An annotated expression which includes actual data domains.
@@ -59,6 +61,11 @@ public abstract class EvalNode implements Cloneable, GsonObject {
public abstract <T extends Datum> T eval(Schema schema, Tuple tuple);
+ public void eval(Schema schema, Tuple tuple, RowWriter builder) {
+ Datum result = eval(schema, tuple);
+ TupleBuilderUtil.writeEvalResult(builder, result.type(), result);
+ }
+
@Deprecated
public abstract void preOrder(EvalNodeVisitor visitor);
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index d8499d0..0d8bd5f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.planner;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.utils.TupleBuilderUtil;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.RowWriter;
import org.apache.tajo.worker.TaskAttemptContext;
public class Projector {
@@ -33,7 +36,14 @@ public class Projector {
private final int targetNum;
private final EvalNode[] evals;
+ private final boolean useJITInSession;
+ private final boolean useJITInOperator;
+
public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
+ this(context, inSchema, outSchema, targets, true);
+ }
+
+ public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets, boolean useJIT) {
this.context = context;
this.inSchema = inSchema;
if (targets == null) {
@@ -45,7 +55,10 @@ public class Projector {
this.targetNum = this.targets.length;
evals = new EvalNode[targetNum];
- if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+ useJITInOperator = useJIT;
+ useJITInSession = context.getQueryContext().getBool(SessionVars.CODEGEN);
+
+ if (useJITInOperator && useJITInSession) {
EvalNode eval;
for (int i = 0; i < targetNum; i++) {
eval = this.targets[i].getEvalTree();
@@ -63,4 +76,12 @@ public class Projector {
out.put(i, evals[i].eval(inSchema, in));
}
}
+
+ public void eval(Tuple in, RowWriter builder) {
+ if (useJITInOperator && useJITInSession) {
+ TupleBuilderUtil.evaluateNative(inSchema, in, builder, evals);
+ } else {
+ TupleBuilderUtil.evaluate(inSchema, in, builder, evals);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 859c053..99cf610 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -63,7 +63,7 @@ public abstract class PhysicalExec implements SchemaObject {
public abstract Tuple next() throws IOException;
- public boolean nextFetch(OffHeapRowBlock rowBlock) {
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 122d4f3..fd11c7b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -41,6 +41,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -67,6 +72,8 @@ public class SeqScanExec extends PhysicalExec {
private boolean cacheRead = false;
+ private OffHeapRowBlock inRowBlock;
+
public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -94,6 +101,8 @@ public class SeqScanExec extends PhysicalExec {
&& plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
rewriteColumnPartitionedTableSchema();
}
+
+ inRowBlock = new OffHeapRowBlock(inSchema, 64 * StorageUnit.KB);
}
/**
@@ -289,6 +298,21 @@ public class SeqScanExec extends PhysicalExec {
}
}
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+ boolean noMoreTuple = scanner.nextFetch(inRowBlock);
+ if (!noMoreTuple) {
+ return false;
+ }
+
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ RowBlockReader reader = inRowBlock.getReader();
+ while (reader.next(zcTuple)) {
+ projector.eval(zcTuple, rowBlock.getWriter());
+ }
+
+ return true;
+ }
+
@Override
public void rescan() throws IOException {
scanner.reset();
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 3199b56..f88af15 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -28,9 +28,13 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.InsertNode;
import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
+import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -121,6 +125,30 @@ public class StoreTableExec extends UnaryPhysicalExec {
return null;
}
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ RowBlockReader reader;
+
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+ if (child.nextFetch(rowBlock)) {
+ reader = rowBlock.getReader();
+ while (reader.next(zcTuple)) {
+ appender.addTuple(zcTuple);;
+
+ if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) {
+ appender.close();
+
+ writtenFileNum++;
+ StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+ openNewFile(writtenFileNum);
+ }
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
public void rescan() throws IOException {
// nothing to do
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
new file mode 100644
index 0000000..dc0f058
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public class TupleBuilderUtil {
+
+ public static void evaluate(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) {
+ builder.startRow();
+ for (int i = 0; i < evals.length; i++) {
+ Datum result = evals[i].eval(inSchema, input);
+ writeEvalResult(builder, result.type(), result);
+ }
+ builder.endRow();
+ }
+
+ public static void evaluateNative(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) {
+ builder.startRow();
+ for (int i = 0; i < evals.length; i++) {
+ evals[i].eval(inSchema, input, builder);
+ }
+ builder.endRow();
+ }
+
+ public static void writeEvalResult(RowWriter builder, TajoDataTypes.Type type, Datum datum) {
+ switch (type) {
+ case NULL_TYPE:
+ builder.skipField();
+ break;
+ case BOOLEAN:
+ builder.putBool(datum.asBool());
+ break;
+ case INT1:
+ case INT2:
+ builder.putInt2(datum.asInt2());
+ break;
+ case INT4:
+ builder.putInt4(datum.asInt4());
+ break;
+ case INT8:
+ builder.putInt8(datum.asInt8());
+ break;
+ case FLOAT4:
+ builder.putFloat4(datum.asFloat4());
+ break;
+ case FLOAT8:
+ builder.putFloat8(datum.asFloat8());
+ break;
+ case TIMESTAMP:
+ builder.putTimestamp(datum.asInt8());
+ break;
+ case TIME:
+ builder.putTime(datum.asInt8());
+ break;
+ case DATE:
+ builder.putDate(datum.asInt4());
+ break;
+ case INTERVAL:
+ builder.putInterval((org.apache.tajo.datum.IntervalDatum) datum);
+ break;
+ case CHAR:
+ case TEXT:
+ builder.putText(datum.asTextBytes());
+ break;
+ case BLOB:
+ builder.putBlob(datum.asByteArray());
+ break;
+ case INET4:
+ builder.putInet4(datum.asInt4());
+ break;
+ case PROTOBUF:
+ builder.putProtoDatum((org.apache.tajo.datum.ProtobufDatum) datum);
+ break;
+ default:
+ throw new UnsupportedException("Unknown Type: " + type.name());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 504a792..23c4949 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -115,6 +115,26 @@ public class GlobalEngine extends AbstractService {
super.stop();
}
+ public SQLAnalyzer getSQLAnalyzer() {
+ return analyzer;
+ }
+
+ public PreLogicalPlanVerifier getPreLogicalPlanVerifier() {
+ return preVerifier;
+ }
+
+ public LogicalPlanner getLogicalPlanner() {
+ return planner;
+ }
+
+ public LogicalOptimizer getLogicalOptimizer() {
+ return optimizer;
+ }
+
+ public LogicalPlanVerifier getLogicalPlanVerifier() {
+ return annotatedPlanVerifier;
+ }
+
private QueryContext createQueryContext(Session session) {
QueryContext newQueryContext = new QueryContext(context.getConf(), session);
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5127e90..67210ea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
@@ -57,6 +58,8 @@ import org.apache.tajo.storage.RawFile;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@@ -446,7 +449,16 @@ public class Task {
createPlan(context, plan);
this.executor.init();
- while(!killed && !aborted && executor.next() != null) {
+ String engineType = context.getQueryContext().get(SessionVars.EXEC_ENGINE);
+ LOG.info(engineType.toUpperCase() + " Executor Engine is chosen.");
+ if (engineType.equalsIgnoreCase("volcano")) {
+ while (!killed && !aborted && executor.next() != null) {
+ }
+ } else if (engineType.equalsIgnoreCase("block")) {
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(executor.getSchema(), 64 * StorageUnit.KB);
+ while (!killed && !aborted && executor.nextFetch(rowBlock)) {
+ }
+ rowBlock.release();
}
} catch (Throwable e) {
error = e ;
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
new file mode 100644
index 0000000..f6a1e05
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
@@ -0,0 +1,211 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical.block;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestBlockIteratorExec extends QueryTestCaseBase {
+
+ private static SQLAnalyzer analyzer;
+ private static LogicalPlanner planner;
+ private static LogicalOptimizer optimizer;
+ private static PhysicalPlanner physicalPlanner;
+ private static AbstractStorageManager sm;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ GlobalEngine engine = testingCluster.getMaster().getContext().getGlobalEngine();
+ analyzer = engine.getSQLAnalyzer();
+ planner = engine.getLogicalPlanner();
+ optimizer = engine.getLogicalOptimizer();
+
+ Path path = CommonTestingUtil.getTestDir("target/test-data/TestBlockExecutor");
+ sm = StorageManagerFactory.getStorageManager(conf, path);
+
+ physicalPlanner = new PhysicalPlannerImpl(conf, sm);
+ }
+
+ private static int i = 0;
+ static Path outputPath;
+
+ /**
+ * Build a physical execution plan, which is a tree consisting of a number of physical executors.
+ *
+ * @param sql a SQL statement
+ * @return Physical Execution Plan
+ * @throws PlanningException
+ * @throws IOException
+ */
+ public static PhysicalExec buildPhysicalPlan(String sql) throws PlanningException, IOException {
+ Expr expr = analyzer.parse(sql);
+
+ QueryContext context = LocalTajoTestingUtility.createDummyContext(conf);
+ LogicalPlan plan = planner.createPlan(context, expr);
+ optimizer.optimize(context, plan);
+
+ LogicalNode [] founds = PlannerUtil.findAllNodes(plan.getRootBlock().getRoot(), NodeType.SCAN);
+
+ List<FileFragment> mergedFragments = Lists.newArrayList();
+
+ for (LogicalNode node : founds) {
+ ScanNode scan = (ScanNode) node;
+ TableDesc table = scan.getTableDesc();
+ FileFragment[] frags = StorageManager.splitNG(conf, scan.getCanonicalName(), table.getMeta(), table.getPath(),
+ Integer.MAX_VALUE);
+
+ for (FileFragment f : frags) {
+ mergedFragments.add(f);
+ }
+ }
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testdir_" + (i++));
+
+ TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), mergedFragments.toArray(new FileFragment[mergedFragments.size()]), workDir);
+
+ outputPath = new Path(workDir, "output");
+ ctx.setOutputPath(outputPath);
+ ctx.setEnforcer(new Enforcer());
+
+ return physicalPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+ }
+
+ @Test
+ public void testSeqScan() throws IOException, PlanningException {
+ PhysicalExec exec = buildPhysicalPlan("select * from lineitem");
+
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+ rowBlock.setMaxRow(1024);
+
+ exec.init();
+
+ int countForTuple = 0;
+ int countForRowBlock = 0;
+ while(exec.nextFetch(rowBlock)) {
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ RowBlockReader reader = rowBlock.getReader();
+ while (reader.next(tuple)) {
+ countForTuple++;
+ }
+ countForRowBlock += rowBlock.rows();
+ }
+ exec.close();
+ rowBlock.release();
+
+ assertEquals(5, countForTuple);
+ assertEquals(5, countForRowBlock);
+ }
+
+ @Test
+ public void testScanWithProjector() throws IOException, PlanningException {
+ PhysicalExec exec = buildPhysicalPlan("select l_orderkey, l_partkey from lineitem");
+
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+ rowBlock.setMaxRow(1024);
+
+ exec.init();
+
+ int countForTuple = 0;
+ int countForRowBlock = 0;
+ while(exec.nextFetch(rowBlock)) {
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ RowBlockReader reader = rowBlock.getReader();
+ while (reader.next(tuple)) {
+ countForTuple++;
+ }
+ countForRowBlock += rowBlock.rows();
+ }
+ exec.close();
+ rowBlock.release();
+
+ assertEquals(5, countForTuple);
+ assertEquals(5, countForRowBlock);
+ }
+
+ @Test
+ public void testStoreTableExec() throws IOException, PlanningException {
+ PhysicalExec exec = buildPhysicalPlan("create table t1 using CSV as select * from lineitem");
+
+
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+ rowBlock.setMaxRow(1024);
+
+ exec.init();
+
+ int countForTuple = 0;
+ int countForRowBlock = 0;
+ while(exec.nextFetch(rowBlock)) {
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ RowBlockReader reader = rowBlock.getReader();
+ while (reader.next(tuple)) {
+ countForTuple++;
+ }
+ countForRowBlock += rowBlock.rows();
+ }
+ exec.close();
+
+ assertEquals(5, countForTuple);
+ assertEquals(5, countForRowBlock);
+
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, exec.getSchema(), outputPath);
+ scanner.init();
+
+ int readTupleCount = 0;
+ while (scanner.nextFetch(rowBlock)) {
+ readTupleCount += rowBlock.rows();
+ }
+ scanner.close();
+
+ assertEquals(5, readTupleCount);
+
+ rowBlock.release();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index e6b12b1..f6edb3d 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -25,6 +25,7 @@ Available Session Variables:
\set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb)
\set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
\set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
+\set EXEC_ENGINE [text value] - executor engine types that queries will use. Types: volcano and block (default is volcano)
\set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
\set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
\set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
index 689efb7..ea86a5a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -127,7 +127,11 @@ public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
return rowNum;
}
- public void setRows(int rowNum) {
+ public void setMaxRow(int rowNum) {
+ this.maxRowNum = rowNum;
+ }
+
+ void setRow(int rowNum) {
this.rowNum = rowNum;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
index d177e0c..ba59b16 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -48,7 +48,7 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter {
@Override
public void endRow() {
super.endRow();
- rowBlock.setRows(rowBlock.rows() + 1);
+ rowBlock.setRow(rowBlock.rows() + 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
index 14e67b2..8d782eb 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -105,7 +105,7 @@ public class ResizableLimitSpec {
}
public long remain(long currentSize) {
- Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+ Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes. But, its size is " + currentSize);
return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
index d1b3afd..e81964b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -42,17 +42,14 @@ import org.apache.tajo.storage.rcfile.RCFile;
import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
import org.apache.tajo.tuple.RowBlockReader;
import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
-import org.apache.tajo.tuple.offheap.UnSafeTuple;
import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.UnsafeUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import sun.misc.Unsafe;
import java.io.IOException;
import java.util.Arrays;
@@ -173,7 +170,7 @@ public class TestNextFetches {
int tupleCnt = 0;
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
while (scanner.nextFetch(rowBlock)) {
tupleCnt += rowBlock.rows();
@@ -233,7 +230,7 @@ public class TestNextFetches {
int tupleCnt = 0;
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
while (scanner.nextFetch(rowBlock)) {
tupleCnt += rowBlock.rows();
@@ -294,7 +291,7 @@ public class TestNextFetches {
int tupleCnt = 0;
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple tuple = new ZeroCopyTuple();
while (scanner.nextFetch(rowBlock)) {
@@ -376,7 +373,7 @@ public class TestNextFetches {
scanner.init();
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple zcTuple = new ZeroCopyTuple();
while (scanner.nextFetch(rowBlock)) {
@@ -465,7 +462,7 @@ public class TestNextFetches {
scanner.init();
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();
@@ -548,7 +545,7 @@ public class TestNextFetches {
scanner.init();
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();
while (scanner.nextFetch(rowBlock)) {
@@ -624,7 +621,7 @@ public class TestNextFetches {
scanner.init();
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();
while (scanner.nextFetch(rowBlock)) {
@@ -703,7 +700,7 @@ public class TestNextFetches {
assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();
@@ -785,7 +782,7 @@ public class TestNextFetches {
assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();
@@ -833,7 +830,7 @@ public class TestNextFetches {
scanner.init();
OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
- rowBlock.setRows(1024);
+ rowBlock.setMaxRow(1024);
ZeroCopyTuple retrieved = new ZeroCopyTuple();