You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/03/26 09:45:53 UTC
git commit: TAJO-692: Missing Null handling for INET4 in
RowStoreUtil. (jihoon)
Repository: tajo
Updated Branches:
refs/heads/branch-0.8.0 5f0a5db12 -> c08a0128e
TAJO-692: Missing Null handling for INET4 in RowStoreUtil. (jihoon)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c08a0128
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c08a0128
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c08a0128
Branch: refs/heads/branch-0.8.0
Commit: c08a0128e7967269830bba8a0f346fbfccd910d9
Parents: 5f0a5db
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Mar 26 17:45:30 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Mar 26 17:45:30 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../engine/planner/RangePartitionAlgorithm.java | 2 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 13 +-
.../tajo/master/querymaster/Repartitioner.java | 4 +-
.../tajo/worker/RangeRetrieverHandler.java | 8 +-
.../planner/physical/TestBSTIndexExec.java | 5 -
.../planner/physical/TestPhysicalPlanner.java | 6 +-
.../apache/tajo/engine/query/TestNetTypes.java | 11 ++
.../apache/tajo/engine/util/TestTupleUtil.java | 15 ++-
.../tajo/worker/TestRangeRetrieverHandler.java | 6 +-
.../dataset/TestNetTypes/table2/table2.tbl | 3 +-
.../queries/TestNetTypes/testSort2.sql | 1 +
.../results/TestNetTypes/testJoin.result | 2 +-
.../results/TestNetTypes/testSort2.result | 6 +
.../tajo/pullserver/PullServerAuxService.java | 7 +-
.../tajo/pullserver/TajoPullServerService.java | 7 +-
.../org/apache/tajo/storage/RowStoreUtil.java | 122 +++++++++++--------
.../apache/tajo/storage/index/bst/BSTIndex.java | 30 +++--
18 files changed, 156 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d063ef7..b76a6b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -283,6 +283,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-692: Missing Null handling for INET4 in RowStoreUtil. (jihoon)
+
TAJO-701: Invalid bytes when creating BlobDatum with offset. (jinho)
TAJO-705: CTAS always stores tables with CSV storage type into catalog.
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index c4b1ae1..35876ba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -148,7 +148,7 @@ public abstract class RangePartitionAlgorithm {
throw new UnsupportedOperationException(dataType + " is not supported yet");
}
- return inclusive ? columnCard.add(new BigDecimal(1)) : columnCard;
+ return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs();
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 54c6f74..9809aee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -30,7 +30,7 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
@@ -46,11 +46,14 @@ public class TupleUtil {
public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
throws UnsupportedEncodingException {
+ return rangeToQuery(range, last, RowStoreEncoder.createInstance(schema));
+ }
+
+ public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
+ throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
- byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder
- .toBytes(schema, range.getStart());
- byte [] endKeyBytes = RowStoreUtil.RowStoreEncoder
- .toBytes(schema, range.getEnd());
+ byte [] firstKeyBytes = encoder.toBytes(range.getStart());
+ byte [] endKeyBytes = encoder.toBytes(range.getEnd());
String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 36203bb..b2adaa4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException;
import org.apache.tajo.master.TaskSchedulerContext;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.TUtil;
@@ -378,11 +379,12 @@ public class Repartitioner {
Set<URI> uris;
try {
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.RowStoreEncoder.createInstance(sortSchema);
for (int i = 0; i < ranges.length; i++) {
uris = new HashSet<URI>();
for (String uri: basicFetchURIs) {
String rangeParam =
- TupleUtil.rangeToQuery(sortSchema, ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+ TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder);
URI finalUri = URI.create(uri + "&" + rangeParam);
uris.add(finalUri);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
index a54fa80..0e8ae72 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -57,6 +57,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
private final BSTIndex.BSTIndexReader idxReader;
private final Schema schema;
private final TupleComparator comp;
+ private final RowStoreDecoder decoder;
public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
this.file = outDir;
@@ -70,6 +71,7 @@ public class RangeRetrieverHandler implements RetrieverHandler {
this.idxReader.open();
LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+ idxReader.getLastKey());
+ this.decoder = RowStoreDecoder.createInstance(schema);
}
@Override
@@ -78,11 +80,11 @@ public class RangeRetrieverHandler implements RetrieverHandler {
// its validity of the file.
File data = new File(this.file, "data/data");
byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
- Tuple start = RowStoreUtil.RowStoreDecoder.toTuple(schema, startBytes);
+ Tuple start = decoder.toTuple(startBytes);
byte [] endBytes;
Tuple end;
endBytes = Base64.decodeBase64(kvs.get("end").get(0));
- end = RowStoreUtil.RowStoreDecoder.toTuple(schema, endBytes);
+ end = decoder.toTuple(endBytes);
boolean last = kvs.containsKey("final");
if(!comp.isAscendingFirstKey()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index b74527a..a47bde3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -187,11 +187,6 @@ public class TestBSTIndexExec {
assertEquals(tupleCount , counter);
}
- @After
- public void shutdown() {
-
- }
-
private class TmpPlanner extends PhysicalPlannerImpl {
public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
super(conf, sm);
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index f32cd1e..cf17d89 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -48,6 +48,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
@@ -895,6 +896,7 @@ public class TestPhysicalPlanner {
// The below is for testing RangeRetrieverHandler.
+ RowStoreEncoder encoder = RowStoreEncoder.createInstance(keySchema);
RangeRetrieverHandler handler = new RangeRetrieverHandler(
new File(new Path(workDir, "output").toUri()), keySchema, comp);
Map<String,List<String>> kvs = Maps.newHashMap();
@@ -902,12 +904,12 @@ public class TestPhysicalPlanner {
startTuple.put(0, DatumFactory.createInt4(50));
kvs.put("start", Lists.newArrayList(
new String(Base64.encodeBase64(
- RowStoreUtil.RowStoreEncoder.toBytes(keySchema, startTuple), false))));
+ encoder.toBytes(startTuple), false))));
Tuple endTuple = new VTuple(1);
endTuple.put(0, DatumFactory.createInt4(80));
kvs.put("end", Lists.newArrayList(
new String(Base64.encodeBase64(
- RowStoreUtil.RowStoreEncoder.toBytes(keySchema, endTuple), false))));
+ encoder.toBytes(endTuple), false))));
FileChunk chunk = handler.get(kvs);
scanner.seek(chunk.startOffset());
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
index e65eb61..a318fee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java
@@ -74,6 +74,17 @@ public class TestNetTypes extends QueryTestCaseBase {
}
@Test
+ public final void testSort2() throws Exception {
+ // Skip all tests when HCatalogStore is used.
+ if (!testingCluster.isHCatalogStoreRunning()) {
+ // select addr from table2 order by addr;
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+ }
+
+ @Test
public final void testJoin() throws Exception {
executeDDL("table1_ddl.sql", "table1");
executeDDL("table2_ddl.sql", "table2");
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index c6ec236..86fa798 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -28,7 +28,12 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
import org.apache.tajo.engine.planner.UniformRangePartition;
import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -64,9 +69,11 @@ public class TestTupleUtil {
DatumFactory.createBlob("hyunsik".getBytes()),
DatumFactory.createInet4("192.168.0.1")
});
-
- byte [] bytes = RowStoreUtil.RowStoreEncoder.toBytes(schema, tuple);
- Tuple tuple2 = RowStoreUtil.RowStoreDecoder.toTuple(schema, bytes);
+
+ RowStoreEncoder encoder = RowStoreEncoder.createInstance(schema);
+ RowStoreDecoder decoder = RowStoreDecoder.createInstance(schema);
+ byte [] bytes = encoder.toBytes(tuple);
+ Tuple tuple2 = decoder.toTuple(bytes);
assertEquals(tuple, tuple2);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index de4560e..4e770ce 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -42,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.planner.physical.ProjectionExec;
import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
@@ -355,13 +356,14 @@ public class TestRangeRetrieverHandler {
private FileChunk getFileChunk(RangeRetrieverHandler handler, Schema keySchema,
TupleRange range, boolean last) throws IOException {
Map<String,List<String>> kvs = Maps.newHashMap();
+ RowStoreEncoder encoder = RowStoreEncoder.createInstance(keySchema);
kvs.put("start", Lists.newArrayList(
new String(Base64.encodeBase64(
- RowStoreUtil.RowStoreEncoder.toBytes(keySchema, range.getStart()),
+ encoder.toBytes(range.getStart()),
false))));
kvs.put("end", Lists.newArrayList(
new String(Base64.encodeBase64(
- RowStoreUtil.RowStoreEncoder.toBytes(keySchema, range.getEnd()), false))));
+ encoder.toBytes(range.getEnd()), false))));
if (last) {
kvs.put("final", Lists.newArrayList("true"));
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl
index f33b22c..9d46bc0 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl
+++ b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl
@@ -1,4 +1,5 @@
1|NULL|NULL|a|127.0.0.8
2|NULL|NULL|b|127.0.0.8
NULL|NULL|10.0|c|NULL
-NULL|NULL|20.0|d|127.0.0.1
\ No newline at end of file
+NULL|NULL|20.0|d|127.0.0.1
+NULL|NULL|20.0|d|255.255.255.255
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql
new file mode 100644
index 0000000..b613d4a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql
@@ -0,0 +1 @@
+select addr from table2 order by addr;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result
index b5817f8..9d2cdf1 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result
@@ -1,6 +1,6 @@
id,name,score,type,addr,id,name,score,type,addr
-------------------------------
-0,,20.0,d,127.0.0.1,1,ooo,1.1,a,127.0.0.1
1,,0.0,a,127.0.0.8,3,qqq,3.4,c,127.0.0.8
2,,0.0,b,127.0.0.8,3,qqq,3.4,c,127.0.0.8
+0,,20.0,d,127.0.0.1,1,ooo,1.1,a,127.0.0.1
0,,20.0,d,127.0.0.1,4,rrr,4.5,d,127.0.0.1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result
new file mode 100644
index 0000000..debbb98
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result
@@ -0,0 +1,6 @@
+addr
+-------------------------------
+127.0.0.1
+127.0.0.8
+127.0.0.8
+255.255.255.255
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index 8054a40..d098797 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -48,7 +48,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -563,17 +563,18 @@ public class PullServerAuxService extends AuxiliaryService {
byte [] startBytes = Base64.decodeBase64(startKey);
byte [] endBytes = Base64.decodeBase64(endKey);
+ RowStoreDecoder decoder = RowStoreDecoder.createInstance(keySchema);
Tuple start;
Tuple end;
try {
- start = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, startBytes);
+ start = decoder.toTuple(startBytes);
} catch (Throwable t) {
throw new IllegalArgumentException("StartKey: " + startKey
+ ", decoded byte size: " + startBytes.length, t);
}
try {
- end = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, endBytes);
+ end = decoder.toTuple(endBytes);
} catch (Throwable t) {
throw new IllegalArgumentException("EndKey: " + endKey
+ ", decoded byte size: " + endBytes.length, t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index a427635..c1fcef1 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -45,7 +45,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.RpcChannelFactory;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -560,17 +560,18 @@ public class TajoPullServerService extends AbstractService {
byte [] startBytes = Base64.decodeBase64(startKey);
byte [] endBytes = Base64.decodeBase64(endKey);
+ RowStoreDecoder decoder = RowStoreDecoder.createInstance(keySchema);
Tuple start;
Tuple end;
try {
- start = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, startBytes);
+ start = decoder.toTuple(startBytes);
} catch (Throwable t) {
throw new IllegalArgumentException("StartKey: " + startKey
+ ", decoded byte size: " + startBytes.length, t);
}
try {
- end = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, endBytes);
+ end = decoder.toTuple(endBytes);
} catch (Throwable t) {
throw new IllegalArgumentException("EndKey: " + endKey
+ ", decoded byte size: " + endBytes.length, t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 7a421a8..66d016b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -22,7 +22,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BitArray;
import java.nio.ByteBuffer;
@@ -48,101 +48,90 @@ public class RowStoreUtil {
public static class RowStoreDecoder {
- public static Tuple toTuple(Schema schema, byte [] bytes) {
+ private Schema schema;
+ private BitArray nullFlags;
+ private int headerSize;
+
+ public static RowStoreDecoder createInstance(Schema schema) {
+ return new RowStoreDecoder(schema);
+ }
+
+ private RowStoreDecoder(Schema schema) {
+ this.schema = schema;
+ nullFlags = new BitArray(schema.size());
+ headerSize = nullFlags.bytesLength();
+ }
+
+
+ public Tuple toTuple(byte [] bytes) {
+ nullFlags.clear();
ByteBuffer bb = ByteBuffer.wrap(bytes);
Tuple tuple = new VTuple(schema.size());
Column col;
TajoDataTypes.DataType type;
+
+ bb.limit(headerSize);
+ nullFlags.fromByteBuffer(bb);
+ bb.limit(bytes.length);
+
for (int i =0; i < schema.size(); i++) {
+ if (nullFlags.get(i)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ continue;
+ }
+
col = schema.getColumn(i);
type = col.getDataType();
switch (type.getType()) {
case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
case BIT:
byte b = bb.get();
- if(b == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createBit(b));
- }
+ tuple.put(i, DatumFactory.createBit(b));
break;
case CHAR:
byte c = bb.get();
- if(c == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createChar(c));
- }
+ tuple.put(i, DatumFactory.createChar(c));
break;
case INT2:
short s = bb.getShort();
- if(s < Short.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createInt2(s));
- }
+ tuple.put(i, DatumFactory.createInt2(s));
break;
case INT4:
case DATE:
int i_ = bb.getInt();
- if ( i_ < Integer.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createFromInt4(type, i_));
- }
+ tuple.put(i, DatumFactory.createFromInt4(type, i_));
break;
case INT8:
case TIME:
case TIMESTAMP:
long l = bb.getLong();
- if ( l < Long.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFromInt8(type, l));
- }
+ tuple.put(i, DatumFactory.createFromInt8(type, l));
break;
case FLOAT4:
float f = bb.getFloat();
- if (Float.isNaN(f)) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFloat4(f));
- }
+ tuple.put(i, DatumFactory.createFloat4(f));
break;
case FLOAT8:
double d = bb.getDouble();
- if(Double.isNaN(d)) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createFloat8(d));
- }
+ tuple.put(i, DatumFactory.createFloat8(d));
break;
case TEXT:
byte [] _string = new byte[bb.getInt()];
bb.get(_string);
- String str = new String(_string);
- if(str.compareTo("NULL") == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- }else {
- tuple.put(i, DatumFactory.createText(str));
- }
+ tuple.put(i, DatumFactory.createText(_string));
break;
case BLOB:
byte [] _bytes = new byte[bb.getInt()];
bb.get(_bytes);
- if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createBlob(_bytes));
- }
+ tuple.put(i, DatumFactory.createBlob(_bytes));
break;
case INET4:
@@ -156,17 +145,40 @@ public class RowStoreUtil {
}
return tuple;
}
+
+ public Schema getSchema() {
+ return schema;
+ }
}
public static class RowStoreEncoder {
+ private Schema schema;
+ private BitArray nullFlags;
+ private int headerSize;
- public static byte [] toBytes(Schema schema, Tuple tuple) {
+ public static RowStoreEncoder createInstance(Schema schema) {
+ return new RowStoreEncoder(schema);
+ }
+
+ private RowStoreEncoder(Schema schema) {
+ this.schema = schema;
+ nullFlags = new BitArray(schema.size());
+ headerSize = nullFlags.bytesLength();
+ }
+ public byte [] toBytes(Tuple tuple) {
+ nullFlags.clear();
int size = StorageUtil.getRowByteSize(schema);
- ByteBuffer bb = ByteBuffer.allocate(size);
+ ByteBuffer bb = ByteBuffer.allocate(size+headerSize);
+ bb.position(headerSize);
Column col;
for (int i = 0; i < schema.size(); i++) {
+ if (tuple.isNull(i)) {
+ nullFlags.set(i);
+ }
+
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
+ case NULL_TYPE: nullFlags.set(i); break;
case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
case BIT: bb.put(tuple.get(i).asByte()); break;
case CHAR: bb.put(tuple.get(i).asByte()); break;
@@ -199,10 +211,20 @@ public class RowStoreUtil {
}
}
+ byte[] flags = nullFlags.toArray();
+ int finalPosition = bb.position();
+ bb.position(0);
+ bb.put(flags);
+
+ bb.position(finalPosition);
bb.flip();
byte [] buf = new byte [bb.limit()];
bb.get(buf);
return buf;
}
+
+ public Schema getSchema() {
+ return schema;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/c08a0128/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index bc8fe96..b149584 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -27,7 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.IndexMethod;
@@ -94,6 +95,7 @@ public class BSTIndex implements IndexMethod {
private Tuple firstKey;
private Tuple lastKey;
+ private RowStoreEncoder rowStoreEncoder;
// private Tuple lastestKey = null;
@@ -111,6 +113,7 @@ public class BSTIndex implements IndexMethod {
this.keySchema = keySchema;
this.compartor = comparator;
this.collector = new KeyOffsetCollector(comparator);
+ this.rowStoreEncoder = RowStoreEncoder.createInstance(keySchema);
}
public void setLoadNum(int loadNum) {
@@ -161,12 +164,10 @@ public class BSTIndex implements IndexMethod {
// entry
out.writeInt(entryNum);
if (entryNum > 0) {
- byte [] minBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
- firstKey);
+ byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
out.writeInt(minBytes.length);
out.write(minBytes);
- byte [] maxBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
- lastKey);
+ byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
out.writeInt(maxBytes.length);
out.write(maxBytes);
}
@@ -197,7 +198,7 @@ public class BSTIndex implements IndexMethod {
}
}
/* key writing */
- byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(this.keySchema, key);
+ byte[] buf = rowStoreEncoder.toBytes(key);
out.writeInt(buf.length);
out.write(buf);
@@ -229,7 +230,7 @@ public class BSTIndex implements IndexMethod {
/* root key writing */
for (Tuple key : keySet) {
- byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, key);
+ byte[] buf = rowStoreEncoder.toBytes(key);
rootOut.writeInt(buf.length);
rootOut.write(buf);
@@ -301,6 +302,8 @@ public class BSTIndex implements IndexMethod {
// mutex
private final Object mutex = new Object();
+ private RowStoreDecoder rowStoreDecoder;
+
/**
*
* @param fileName
@@ -312,6 +315,7 @@ public class BSTIndex implements IndexMethod {
this.fileName = fileName;
this.keySchema = keySchema;
this.comparator = comparator;
+ this.rowStoreDecoder = RowStoreDecoder.createInstance(keySchema);
}
public BSTIndexReader(final Path fileName) throws IOException {
@@ -336,6 +340,7 @@ public class BSTIndex implements IndexMethod {
builder.mergeFrom(schemaBytes);
SchemaProto proto = builder.build();
this.keySchema = new Schema(proto);
+ this.rowStoreDecoder = RowStoreDecoder.createInstance(keySchema);
// comparator
int compByteSize = indexIn.readInt();
@@ -353,11 +358,11 @@ public class BSTIndex implements IndexMethod {
if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
byte [] minBytes = new byte[indexIn.readInt()];
Bytes.readFully(indexIn, minBytes, 0, minBytes.length);
- this.firstKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, minBytes);
+ this.firstKey = rowStoreDecoder.toTuple(minBytes);
byte [] maxBytes = new byte[indexIn.readInt()];
Bytes.readFully(indexIn, maxBytes, 0, maxBytes.length);
- this.lastKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, maxBytes);
+ this.lastKey = rowStoreDecoder.toTuple(maxBytes);
}
}
@@ -476,12 +481,11 @@ public class BSTIndex implements IndexMethod {
this.offsetSubIndex = new long[entryNum][];
byte[] buf;
-
for (int i = 0; i < entryNum; i++) {
counter++;
buf = new byte[in.readInt()];
Bytes.readFully(in, buf, 0, buf.length);
- dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+ dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
int offsetNum = in.readInt();
this.offsetSubIndex[i] = new long[offsetNum];
@@ -503,7 +507,7 @@ public class BSTIndex implements IndexMethod {
for (int i = 0; i < counter; i++) {
buf = new byte[in.readInt()];
Bytes.readFully(in, buf, 0, buf.length);
- dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+ dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
int offsetNum = in.readInt();
this.offsetSubIndex[i] = new long[offsetNum];
@@ -532,7 +536,7 @@ public class BSTIndex implements IndexMethod {
for (int i = 0; i < entryNum; i++) {
buf = new byte[in.readInt()];
Bytes.readFully(in, buf, 0, buf.length);
- keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+ keyTuple = rowStoreDecoder.toTuple(buf);
dataIndex[i] = keyTuple;
this.offsetIndex[i] = in.readLong();
}