You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/08/12 06:38:10 UTC
git commit: TAJO-118: Refactor and Improve text file Scanner. (jinho)
Updated Branches:
refs/heads/master 1c677cc5c -> cf6bd4b36
TAJO-118: Refactor and Improve text file Scanner. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/cf6bd4b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/cf6bd4b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/cf6bd4b3
Branch: refs/heads/master
Commit: cf6bd4b361b57ffebf1c99ed3e4179f4a155c159
Parents: 1c677cc
Author: jinossy <ji...@gmail.com>
Authored: Mon Aug 12 13:36:04 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Aug 12 13:36:04 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../planner/physical/BSTIndexScanExec.java | 1 +
.../engine/planner/physical/PhysicalExec.java | 2 +
.../engine/planner/physical/SeqScanExec.java | 30 ++++-
.../apache/tajo/engine/query/ResultSetImpl.java | 1 +
.../org/apache/tajo/master/ClientService.java | 1 +
.../org/apache/tajo/TajoTestingCluster.java | 6 +
.../planner/physical/TestPhysicalPlanner.java | 31 +++++-
.../tajo/worker/TestRangeRetrieverHandler.java | 4 +-
.../java/org/apache/tajo/storage/CSVFile.java | 110 +++++++++----------
.../org/apache/tajo/storage/FileScanner.java | 2 +
.../apache/tajo/storage/TestStorageManager.java | 2 +-
.../org/apache/tajo/storage/TestStorages.java | 2 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 8 ++
.../index/TestSingleCSVFileBSTIndex.java | 7 +-
15 files changed, 140 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7bea99..d48c15c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-118: Refactor and Improve text file Scanner. (jinho)
+
TAJO-95: Eliminate the lazy copy approach from the classes wrapping
protobuf-generated classes. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index e3b299d..a0d69e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -62,6 +62,7 @@ public class BSTIndexScanExec extends PhysicalExec {
this.fileScanner = (SeekableScanner)StorageManager.getScanner(context.getConf(),
fragment.getMeta(), fragment, outSchema);
+ this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
this.evalContexts = projector.renew();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 1781304..c67c309 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -29,12 +29,14 @@ public abstract class PhysicalExec implements SchemaObject {
protected final TaskAttemptContext context;
protected final Schema inSchema;
protected final Schema outSchema;
+ protected final int outColumnNum;
public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
final Schema outSchema) {
this.context = context;
this.inSchema = inSchema;
this.outSchema = outSchema;
+ this.outColumnNum = outSchema.getColumnNum();
}
public final Schema getSchema() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 9279c01..dc42a71 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -19,14 +19,20 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.eval.EvalContext;
import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
public class SeqScanExec extends PhysicalExec {
private final ScanNode plan;
@@ -56,6 +62,28 @@ public class SeqScanExec extends PhysicalExec {
}
public void init() throws IOException {
+ Schema projected;
+ if (plan.hasTargets()) {
+ projected = new Schema();
+ Set<Column> columnSet = new HashSet<Column>();
+
+ if (plan.hasQual()) {
+ columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(qual));
+ }
+
+ for (Target t : plan.getTargets()) {
+ columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(t.getEvalTree()));
+ }
+
+ for (Column column : inSchema.getColumns()) {
+ if (columnSet.contains(column)) {
+ projected.addColumn(column);
+ }
+ }
+ } else {
+ projected = outSchema;
+ }
+
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
this.evalContexts = projector.renew();
@@ -64,7 +92,7 @@ public class SeqScanExec extends PhysicalExec {
TUtil.newList(fragments));
} else {
this.scanner = StorageManager.getScanner(context.getConf(), fragments[0].getMeta(),
- fragments[0], plan.getOutSchema());
+ fragments[0], projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index 62a3ceb..db049a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -76,6 +76,7 @@ public class ResultSetImpl implements ResultSet {
this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
Collection<Fragment> frags = getFragmentsNG(meta, path);
scanner = new MergeScanner(conf, meta, frags);
+ scanner.init();
init();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
index 827641a..a6a5f68 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
@@ -135,6 +135,7 @@ public class ClientService extends AbstractService {
if (e.getMessage() != null) {
build.setErrorMessage(ExceptionUtils.getStackTrace(e));
} else {
+ LOG.error("Internal Error", e);
build.setErrorMessage("Internal Error");
}
return build.build();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9f620e8..4911b48 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -333,6 +333,12 @@ public class TajoTestingCluster {
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+ // fixed thread OOM
+ conf.setInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 2);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 2);
+ conf.setInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 2);
+ conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 2);
+
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index e329fad..3051204 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -173,7 +173,8 @@ public class TestPhysicalPlanner {
"select 3 < 4 as ineq, 3.5 * 2 as score", // 12
"select (1 > 0) and 3 > 1", // 13
"select deptName, class, sum(score), max(score), min(score) from score", // 14
- "select deptname, class, sum(score), max(score), min(score) from score group by deptname" // 15
+ "select deptname, class, sum(score), max(score), min(score) from score group by deptname", // 15
+ "select name from employee where empid >= 0", // 16
};
@Test
@@ -207,6 +208,34 @@ public class TestPhysicalPlanner {
}
@Test
+ public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
+ Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+ employee.getPath(), Integer.MAX_VALUE);
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil
+ .newQueryUnitAttemptId(),
+ new Fragment[] { frags[0] }, workDir);
+ Expr expr = analyzer.parse(QUERIES[16]);
+ LogicalPlan plan = planner.createPlan(expr);
+ LogicalNode rootNode =plan.getRootBlock().getRoot();
+ LogicalOptimizer.optimize(plan);
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ Tuple tuple;
+ int i = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ assertTrue(tuple.contains(0));
+ i++;
+ }
+ exec.close();
+ assertEquals(100, i);
+ }
+
+ @Test
public final void testGroupByPlan() throws IOException, PlanningException {
Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 711d63d..109b3ab 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -163,7 +163,7 @@ public class TestRangeRetrieverHandler {
reader.open();
SeekableScanner scanner = (SeekableScanner)
sm.getScanner(conf, employeeMeta, StorageUtil.concatPath(testDir, "output", "output"));
-
+ scanner.init();
int cnt = 0;
while(scanner.next() != null) {
cnt++;
@@ -273,7 +273,7 @@ public class TestRangeRetrieverHandler {
reader.open();
SeekableScanner scanner = (SeekableScanner) StorageManager.getScanner(
conf, meta, StorageUtil.concatPath(testDir, "output", "output"));
-
+ scanner.init();
int cnt = 0;
while(scanner.next() != null) {
cnt++;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 498d0df..0cba890 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,7 @@ import org.apache.tajo.storage.json.StorageGsonHelper;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
public class CSVFile {
public static final String DELIMITER = "csvfile.delimiter";
@@ -49,7 +50,6 @@ public class CSVFile {
private final FileSystem fs;
private FSDataOutputStream fos;
private String delimiter;
-
private TableStatistics stats = null;
public CSVAppender(Configuration conf, final TableMeta meta,
@@ -58,7 +58,6 @@ public class CSVFile {
this.fs = path.getFileSystem(conf);
this.meta = meta;
this.schema = meta.getSchema();
-
this.delimiter = this.meta.getOption(DELIMITER, DELIMITER_DEFAULT);
}
@@ -198,13 +197,12 @@ public class CSVFile {
public CSVScanner(Configuration conf, final TableMeta meta,
final Fragment fragment) throws IOException {
super(conf, meta, fragment);
- init(fragment);
}
private static final byte LF = '\n';
- private final static long DEFAULT_BUFFER_SIZE = 65536;
+ private final static long DEFAULT_BUFFER_SIZE = 256 * 1024;
private long bufSize;
- private String delimiter;
+ private char delimiter;
private FileSystem fs;
private FSDataInputStream fis;
private long startOffset, length, startPos;
@@ -215,17 +213,15 @@ public class CSVFile {
private byte[] tail = null;
private long pageStart = -1;
private long prevTailLen = -1;
- private HashMap<Long, Integer> curTupleOffsetMap = null;
+ private int[] targetColumnIndexes;
- private void init(final Fragment fragment) throws IOException {
+ @Override
+ public void init() throws IOException {
// Buffer size, Delimiter
this.bufSize = DEFAULT_BUFFER_SIZE;
- this.delimiter = fragment.getMeta().getOption(DELIMITER,
- DELIMITER_DEFAULT);
- if (this.delimiter.equals("|")) {
- this.delimiter = "\\|";
- }
+ String delim = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+ this.delimiter = delim.charAt(0);
// Fragment information
this.fs = fragment.getPath().getFileSystem(this.conf);
@@ -234,6 +230,16 @@ public class CSVFile {
this.length = fragment.getLength();
tuples = new String[0];
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+ }
+ super.init();
+
if (startOffset != 0) {
fis.seek(startOffset - 1);
while (fis.readByte() != LF) {
@@ -275,11 +281,11 @@ public class CSVFile {
buf = new byte[(int) bufSize];
rbyte = fis.read(buf);
tail = new byte[0];
- tuples = new String(buf,0,rbyte).split("\n");
+ tuples = StringUtils.split(new String(buf, 0, rbyte), (char)LF);
} else {
buf = new byte[(int) bufSize];
rbyte = fis.read(buf);
- tuples = (new String(tail) + new String(buf,0,rbyte)).split("\n");
+ tuples = StringUtils.split(new String(tail) + new String(buf, 0, rbyte), (char)LF);
}
// Check tail
@@ -293,8 +299,7 @@ public class CSVFile {
}
// Replace tuple
- tuples[tuples.length - 1] = new String(tuples[tuples.length - 1]
- + new String(temp,0,cnt));
+ tuples[tuples.length - 1] = tuples[tuples.length - 1] + new String(temp, 0, cnt);
validIdx = tuples.length;
} else {
tail = tuples[tuples.length - 1].getBytes();
@@ -309,26 +314,15 @@ public class CSVFile {
private void makeTupleOffset() {
long curTupleOffset = 0;
- this.tupleOffsets = null;
this.tupleOffsets = new long[this.validIdx];
-
- this.curTupleOffsetMap = null;
- this.curTupleOffsetMap = new HashMap<Long, Integer>();
-
for (int i = 0; i < this.validIdx; i++) {
this.tupleOffsets[i] = curTupleOffset + this.pageStart;
- this.curTupleOffsetMap.put(tupleOffsets[i], i);
- curTupleOffset += (this.tuples[i] + "\n").getBytes().length;
+ curTupleOffset += this.tuples[i].getBytes().length + 1;//tuple byte + 1byte line feed
}
}
@Override
- public void init() throws IOException {
- super.init();
- }
-
- @Override
public Tuple next() throws IOException {
try {
if (currentIdx == validIdx) {
@@ -340,58 +334,55 @@ public class CSVFile {
}
}
long offset = this.tupleOffsets[currentIdx];
- String[] cells = tuples[currentIdx++].split(delimiter);
- VTuple tuple = new VTuple(schema.getColumnNum());
+ String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+ int targetLen = targets.length;
+ VTuple tuple = new VTuple(columnNum);
Column field;
tuple.setOffset(offset);
- for (int i = 0; i < schema.getColumnNum(); i++) {
- field = schema.getColumn(i);
- if (cells.length <= i) {
- tuple.put(i, DatumFactory.createNullDatum());
+ for (int i = 0; i < targetLen; i++) {
+ field = targets[i];
+ int tid = targetColumnIndexes[i];
+ if (cells.length <= tid) {
+ tuple.put(tid, DatumFactory.createNullDatum());
} else {
- String cell = cells[i].trim();
+ String cell = cells[tid].trim();
if (cell.equals("")) {
- tuple.put(i, DatumFactory.createNullDatum());
+ tuple.put(tid, DatumFactory.createNullDatum());
} else {
switch (field.getDataType().getType()) {
case BOOLEAN:
- tuple.put(i, DatumFactory.createBool(cell));
+ tuple.put(tid, DatumFactory.createBool(cell));
break;
case BIT:
- tuple.put(i,
- DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
+ tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
break;
case CHAR:
- tuple.put(i, DatumFactory.createChar(cell.charAt(0)));
+ tuple.put(tid, DatumFactory.createChar(cell.charAt(0)));
break;
case BLOB:
- tuple.put(i,
- DatumFactory.createBlob(Base64.decodeBase64(cell)));
+ tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
break;
case INT2:
- tuple.put(i, DatumFactory.createInt2(cell));
+ tuple.put(tid, DatumFactory.createInt2(cell));
break;
case INT4:
- tuple.put(i, DatumFactory.createInt4(cell));
+ tuple.put(tid, DatumFactory.createInt4(cell));
break;
case INT8:
- tuple.put(i, DatumFactory.createInt8(cell));
+ tuple.put(tid, DatumFactory.createInt8(cell));
break;
case FLOAT4:
- tuple.put(i, DatumFactory.createFloat4(cell));
+ tuple.put(tid, DatumFactory.createFloat4(cell));
break;
case FLOAT8:
tuple.put(i, DatumFactory.createFloat8(cell));
break;
-// case STRING:
-// tuple.put(i, DatumFactory.createText(cell));
-// break;
case TEXT:
- tuple.put(i, DatumFactory.createText(cell));
+ tuple.put(tid, DatumFactory.createText(cell));
break;
case INET4:
- tuple.put(i, DatumFactory.createInet4(cell));
+ tuple.put(tid, DatumFactory.createInet4(cell));
break;
case ARRAY:
Datum data = StorageGsonHelper.getInstance().fromJson(cell,
@@ -412,7 +403,7 @@ public class CSVFile {
@Override
public void reset() throws IOException {
- init(fragment);
+ init();
}
@Override
@@ -422,11 +413,7 @@ public class CSVFile {
@Override
public boolean isProjectable() {
- return false;
- }
-
- @Override
- public void setTarget(Column[] targets) {
+ return true;
}
@Override
@@ -440,8 +427,9 @@ public class CSVFile {
@Override
public void seek(long offset) throws IOException {
- if (this.curTupleOffsetMap.containsKey(offset)) {
- this.currentIdx = this.curTupleOffsetMap.get(offset);
+ int tupleIndex = Arrays.binarySearch(this.tupleOffsets, offset);
+ if (tupleIndex > -1) {
+ this.currentIdx = tupleIndex;
} else if (offset >= this.pageStart + this.bufSize
+ this.prevTailLen - this.tail.length || offset <= this.pageStart) {
fis.seek(offset);
@@ -455,7 +443,7 @@ public class CSVFile {
throw new IOException("invalid offset " +
" < pageStart : " + this.pageStart + " , " +
" pagelength : " + this.bufSize + " , " +
- " tail lenght : " + this.tail.length +
+ " tail lenght : " + this.tail.length +
" input offset : " + offset + " >");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 4b8acc7..005879a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -31,6 +31,7 @@ public abstract class FileScanner implements Scanner {
protected final TableMeta meta;
protected final Schema schema;
protected final Fragment fragment;
+ protected final int columnNum;
protected Column [] targets;
@@ -39,6 +40,7 @@ public abstract class FileScanner implements Scanner {
this.meta = meta;
this.schema = meta.getSchema();
this.fragment = fragment;
+ this.columnNum = this.schema.getColumnNum();
}
public void init() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 7aed05d..4c4462f 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -83,7 +83,7 @@ public class TestStorageManager {
appender.close();
Scanner scanner = StorageManager.getScanner(conf, meta, path);
-
+ scanner.init();
int i=0;
while(scanner.next() != null) {
i++;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index b792d8b..b8c7021 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -170,7 +170,7 @@ public class TestStorages {
int tupleCnt = 0;
Tuple tuple;
while ((tuple = scanner.next()) != null) {
- if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI) {
+ if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType == StoreType.CSV) {
assertNull(tuple.get(0));
}
assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index e66182a..63f2a2d 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -109,6 +109,7 @@ public class TestBSTIndex {
creater.open();
SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -130,6 +131,7 @@ public class TestBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
reader.open();
scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
@@ -204,6 +206,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.open();
SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
@@ -263,6 +266,7 @@ public class TestBSTIndex {
creater.open();
SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -330,6 +334,7 @@ public class TestBSTIndex {
creater.open();
SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -351,6 +356,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.open();
scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple result;
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
keyTuple = new VTuple(2);
@@ -414,6 +420,7 @@ public class TestBSTIndex {
creater.open();
SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -435,6 +442,7 @@ public class TestBSTIndex {
keySchema, comp);
reader.open();
scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+ scanner.init();
Tuple result;
for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
keyTuple = new VTuple(2);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 054e8fa..2708403 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -93,8 +93,7 @@ public class TestSingleCSVFileBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0,
- fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -113,6 +112,7 @@ public class TestSingleCSVFileBSTIndex {
creater.open();
SeekableScanner fileScanner = new CSVScanner(conf, meta, tablet);
+ fileScanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -136,6 +136,7 @@ public class TestSingleCSVFileBSTIndex {
"FindValueInCSV.idx"), keySchema, comp);
reader.open();
fileScanner = new CSVScanner(conf, meta, tablet);
+ fileScanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
@@ -200,6 +201,7 @@ public class TestSingleCSVFileBSTIndex {
creater.open();
SeekableScanner fileScanner = new CSVScanner(conf, meta, tablet);
+ fileScanner.init();
Tuple keyTuple;
long offset;
while (true) {
@@ -220,6 +222,7 @@ public class TestSingleCSVFileBSTIndex {
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
reader.open();
fileScanner = new CSVScanner(conf, meta, tablet);
+ fileScanner.init();
Tuple result;
for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
keyTuple = new VTuple(2);