You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/18 01:54:35 UTC
[2/2] tajo git commit: TAJO-2146: Fragment interface cleanup.
TAJO-2146: Fragment interface cleanup.
Closes #1015
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c44272b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c44272b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c44272b
Branch: refs/heads/master
Commit: 1c44272bff0fc0022a1c8ce060b70d11a30c59e0
Parents: 3de3774
Author: Jihoon Son <ji...@apache.org>
Authored: Wed May 18 10:54:02 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 18 10:54:02 2016 +0900
----------------------------------------------------------------------
CHANGES | 4 +-
.../src/main/proto/CatalogProtos.proto | 2 +-
.../org/apache/tajo/datum/TestInt4Datum.java | 6 +-
.../tajo/engine/query/TestHBaseTable.java | 44 +-
.../apache/tajo/storage/TestFileFragment.java | 13 +-
.../org/apache/tajo/storage/TestRowFile.java | 140 ------
.../engine/planner/PhysicalPlannerImpl.java | 4 +-
.../planner/physical/BSTIndexScanExec.java | 2 +-
.../planner/physical/ExternalSortExec.java | 4 +-
.../planner/physical/IndexExecutorUtil.java | 5 +-
.../planner/physical/PhysicalPlanUtil.java | 3 +-
.../engine/planner/physical/SeqScanExec.java | 2 +-
.../engine/planner/physical/StoreIndexExec.java | 2 +-
.../exec/NonForwardQueryResultFileScanner.java | 4 +-
.../java/org/apache/tajo/querymaster/Task.java | 19 +-
.../apache/tajo/worker/TaskAttemptContext.java | 15 +-
.../org/apache/tajo/storage/Tablespace.java | 15 +
.../storage/fragment/BuiltinFragmentKinds.java | 25 +
.../apache/tajo/storage/fragment/Fragment.java | 148 +++++-
.../storage/fragment/FragmentConvertor.java | 99 ++--
.../tajo/storage/fragment/FragmentSerde.java | 54 ++
.../src/main/resources/storage-default.xml | 46 +-
.../src/test/resources/storage-default.xml | 46 +-
.../tajo/storage/hbase/HBaseFragment.java | 187 +++----
.../tajo/storage/hbase/HBaseFragmentSerde.java | 60 +++
.../apache/tajo/storage/hbase/HBaseScanner.java | 20 +-
.../tajo/storage/hbase/HBaseTablespace.java | 4 +-
.../org/apache/tajo/storage/FileTablespace.java | 4 +-
.../java/org/apache/tajo/storage/RawFile.java | 2 +-
.../java/org/apache/tajo/storage/RowFile.java | 502 -------------------
.../tajo/storage/fragment/FileFragment.java | 174 +------
.../storage/fragment/FileFragmentSerde.java | 66 +++
.../tajo/storage/text/DelimitedLineReader.java | 2 +-
.../tajo/storage/text/DelimitedTextFile.java | 2 +-
.../storage/thirdparty/orc/OrcRecordReader.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 10 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 41 +-
.../index/TestSingleCSVFileBSTIndex.java | 8 +-
.../src/test/resources/storage-default.xml | 38 +-
.../apache/tajo/storage/jdbc/JdbcFragment.java | 86 +---
.../tajo/storage/jdbc/JdbcFragmentSerde.java | 47 ++
.../apache/tajo/storage/jdbc/JdbcScanner.java | 2 +-
.../tajo/storage/jdbc/JdbcTablespace.java | 2 +-
43 files changed, 705 insertions(+), 1256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8f6c0fe..beee5be 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,7 +4,7 @@ Release 0.12.0 - unreleased
NEW FEATURES
- TAJO-1686: Allow Tajo to use Hive UDF. (jihoon)
+ TAJO-1686: Allow Tajo to use Hive UDF. (Jongyoung Park via jihoon)
TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
@@ -14,6 +14,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-2146: Fragment interface cleanup. (jihoon)
+
TAJO-2129: Apply new type implementation to Schema and Catalog. (hyunsik)
TAJO-2071: Supporting DATE type in Parquet format.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index e79bc75..b42cf58 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -75,7 +75,7 @@ message SchemaProto {
message FragmentProto {
required string id = 1;
- required string data_format = 2;
+ required string kind = 2;
required bytes contents = 3;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
index 559bed3..294f8bb 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
@@ -18,12 +18,10 @@
package org.apache.tajo.datum;
-import org.junit.Test;
import org.apache.tajo.common.TajoDataTypes;
+import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestInt4Datum {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 8914e3b..5819179 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -555,8 +555,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
Tablespace tablespace = TablespaceManager.getByName("cluster1");
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(1, fragments.size());
- assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
- assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
+ assertEquals("021", ((HBaseFragment)fragments.get(0)).getStartKey().toString());
+ assertEquals("021" + postFix, ((HBaseFragment)fragments.get(0)).getEndKey().toString());
// where rk >= '020' and rk <= '055'
EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -569,12 +569,12 @@ public class TestHBaseTable extends QueryTestCaseBase {
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or rk = '075'
EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -584,16 +584,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
- assertEquals("075", new String(fragment3.getStartRow()));
- assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+ assertEquals("075", fragment3.getStartKey().toString());
+ assertEquals("075" + postFix, fragment3.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
@@ -608,16 +608,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("055" + postFix, fragment2.getEndKey().toString());
fragment3 = (HBaseFragment) fragments.get(2);
- assertEquals("072", new String(fragment3.getStartRow()));
- assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+ assertEquals("072", fragment3.getStartKey().toString());
+ assertEquals("078" + postFix, fragment3.getEndKey().toString());
// where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -631,12 +631,12 @@ public class TestHBaseTable extends QueryTestCaseBase {
assertEquals(2, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
- assertEquals("020", new String(fragment1.getStartRow()));
- assertEquals("040", new String(fragment1.getStopRow()));
+ assertEquals("020", fragment1.getStartKey().toString());
+ assertEquals("040", fragment1.getEndKey().toString());
fragment2 = (HBaseFragment) fragments.get(1);
- assertEquals("040", new String(fragment2.getStartRow()));
- assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+ assertEquals("040", fragment2.getStartKey().toString());
+ assertEquals("059" + postFix, fragment2.getEndKey().toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
index 86f7d1a..cccff70 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -20,6 +20,8 @@ package org.apache.tajo.storage;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
@@ -34,10 +36,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestFileFragment {
+ private TajoConf conf;
private Path path;
@Before
public final void setUp() throws Exception {
+ conf = new TajoConf();
path = CommonTestingUtil.getTestDir();
}
@@ -45,7 +49,7 @@ public class TestFileFragment {
public final void testGetAndSetFields() {
FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
- assertEquals("table1_1", fragment1.getTableName());
+ assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
@@ -55,8 +59,9 @@ public class TestFileFragment {
public final void testGetProtoAndRestore() {
FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
- FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
- assertEquals("table1_1", fragment1.getTableName());
+ FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE,
+ FragmentConvertor.toFragmentProto(conf, fragment));
+ assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
@@ -73,7 +78,7 @@ public class TestFileFragment {
Arrays.sort(tablets);
for(int i = 0; i < num; i++) {
- assertEquals("tablet1_"+i, tablets[i].getTableName());
+ assertEquals("tablet1_"+i, tablets[i].getInputSourceId());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
deleted file mode 100644
index 0173f59..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaBuilder;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestRowFile {
- private static final Log LOG = LogFactory.getLog(TestRowFile.class);
-
- private TajoTestingCluster cluster;
- private TajoConf conf;
-
- @Before
- public void setup() throws Exception {
- cluster = TpchTestBase.getInstance().getTestingCluster();
- conf = cluster.getConfiguration();
- }
-
- @After
- public void teardown() throws Exception {
- }
-
- @Test
- public void test() throws IOException {
- Schema schema = SchemaBuilder.builder()
- .add("id", Type.INT4)
- .add("age", Type.INT8)
- .add("description", Type.TEXT)
- .build();
-
- TableMeta meta = CatalogUtil.newTableMeta("ROWFILE", conf);
-
- FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri());
-
- Path tablePath = new Path("/test");
- Path dataPath = new Path(tablePath, "test.tbl");
- FileSystem fs = sm.getFileSystem();
- fs.mkdirs(tablePath);
-
- Appender appender = sm.getAppender(meta, schema, dataPath);
- appender.enableStats();
- appender.init();
-
- int tupleNum = 200;
- Tuple tuple;
- Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz");
- Set<Integer> idSet = Sets.newHashSet();
-
- tuple = new VTuple(3);
- long start = System.currentTimeMillis();
- for(int i = 0; i < tupleNum; i++) {
- tuple.put(0, DatumFactory.createInt4(i + 1));
- tuple.put(1, DatumFactory.createInt8(25l));
- tuple.put(2, stringDatum);
- appender.addTuple(tuple);
- idSet.add(i+1);
- }
- appender.close();
-
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
-
- FileStatus file = fs.getFileStatus(dataPath);
- FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
-
- int tupleCnt = 0;
- start = System.currentTimeMillis();
- Scanner scanner = sm.getScanner(meta, schema, fragment, null);
- scanner.init();
- while ((tuple=scanner.next()) != null) {
- tupleCnt++;
- }
- scanner.close();
-
- assertEquals(tupleNum, tupleCnt);
-
- tupleCnt = 0;
- long fileStart = 0;
- long fileLen = file.getLen()/13;
-
- for (int i = 0; i < 13; i++) {
- fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
- scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
- scanner.init();
- while ((tuple=scanner.next()) != null) {
- if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) {
- LOG.debug("duplicated! " + tuple.getInt4(0));
- }
- tupleCnt++;
- }
- scanner.close();
- fileStart += fileLen;
- if (i == 11) {
- fileLen = file.getLen() - fileStart;
- }
- }
- assertEquals(tupleNum, tupleCnt);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 87a6e74..fce56fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -923,13 +923,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = new ArrayList<>();
- FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri());
+ FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri());
for (Path path : partitionedTableScanNode.getInputPaths()) {
fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path)));
}
FragmentProto[] fragments =
- FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()]));
+ FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()]));
ctx.addFragments(scanNode.getCanonicalName(), fragments);
return new PartitionMergeScanExec(ctx, scanNode, fragments);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 3be1d36..7ab0943 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -86,7 +86,7 @@ public class BSTIndexScanExec extends ScanExec {
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
- Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
+ Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(context.getConf(), fragment));
this.reader = new BSTIndex(context.getConf()).
getIndexReader(indexPath, keySchema, comparator);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 3d2de28..4ec5144 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -170,7 +170,7 @@ public class ExternalSortExec extends SortExec {
mergedInputFragments = new ArrayList<>();
for (CatalogProtos.FragmentProto proto : fragments) {
- FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ FileFragment fragment = FragmentConvertor.convert(context.getConf(), proto);
mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta()));
}
}
@@ -464,7 +464,7 @@ public class ExternalSortExec extends SortExec {
debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem());
}
chunk.getMemoryTuples().release();
- } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
+ } else if (chunk.getFragment().getInputSourceId().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(chunk.getFragment().getPath(), true);
numDeletedFiles++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
index 3b8317f..df143c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
@@ -18,14 +18,15 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
public class IndexExecutorUtil {
- public static String getIndexFileName(FragmentProto fragmentProto) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto);
+ public static String getIndexFileName(Configuration conf, FragmentProto fragmentProto) {
+ FileFragment fileFragment = FragmentConvertor.convert(conf, fragmentProto);
StringBuilder sb = new StringBuilder();
sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
return sb.toString();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index d1dfe40..074d0ab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -40,6 +40,7 @@ import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.IOException;
@@ -120,7 +121,7 @@ public class PhysicalPlanUtil {
fragments.add(fileFragment);
}
}
- return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[fragments.size()]));
+ return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index dc48f3f..52cb080 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -97,7 +97,7 @@ public class SeqScanExec extends ScanExec {
Tuple partitionRow = null;
if (fragments != null && fragments.length > 0) {
- List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+ List<FileFragment> fileFragments = FragmentConvertor.convert(context.getConf(), fragments);
// Get a partition key value from a given path
partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath(
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index c5e1093..46f672d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -72,7 +72,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
TajoConf conf = context.getConf();
Path indexPath = new Path(logicalPlan.getIndexPath().toString(),
- IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0]));
+ IndexExecutorUtil.getIndexFileName(conf, scanExec.getFragments()[0]));
// TODO: Create factory using reflection
BSTIndex bst = new BSTIndex(conf);
this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 871db89..e8b8b45 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -108,7 +108,9 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
if (!fragments.isEmpty()) {
- FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));
+ FragmentProto[] fragmentProtos =
+ FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
+
this.taskContext = new TaskAttemptContext(
new QueryContext(tajoConf), null,
new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index f8b89f1..f1ad931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -20,6 +20,7 @@ package org.apache.tajo.querymaster;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -281,7 +282,7 @@ public class Task implements EventHandler<TaskEvent> {
fragmentList.add(fragment.toString());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- fragmentList.add("ERROR: " + eachFragment.getDataFormat() + "," + eachFragment.getId() + ": " + e.getMessage());
+ fragmentList.add("ERROR: " + eachFragment.getKind() + "," + eachFragment.getId() + ": " + e.getMessage());
}
}
taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
@@ -330,25 +331,25 @@ public class Task implements EventHandler<TaskEvent> {
}
private void addDataLocation(Fragment fragment) {
- String[] hosts = fragment.getHosts();
- int[] diskIds = null;
+ ImmutableList<String> hosts = fragment.getHostNames();
+ Integer[] diskIds = null;
if (fragment instanceof FileFragment) {
diskIds = ((FileFragment)fragment).getDiskIds();
}
- for (int i = 0; i < hosts.length; i++) {
- dataLocations.add(new DataLocation(hosts[i], diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
+ for (int i = 0; i < hosts.size(); i++) {
+ dataLocations.add(new DataLocation(hosts.get(i), diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
}
}
public void addFragment(Fragment fragment, boolean useDataLocation) {
Set<FragmentProto> fragmentProtos;
- if (fragMap.containsKey(fragment.getTableName())) {
- fragmentProtos = fragMap.get(fragment.getTableName());
+ if (fragMap.containsKey(fragment.getInputSourceId())) {
+ fragmentProtos = fragMap.get(fragment.getInputSourceId());
} else {
fragmentProtos = new HashSet<>();
- fragMap.put(fragment.getTableName(), fragmentProtos);
+ fragMap.put(fragment.getInputSourceId(), fragmentProtos);
}
- fragmentProtos.add(fragment.getProto());
+ fragmentProtos.add(FragmentConvertor.toFragmentProto(systemConf, fragment));
if (useDataLocation) {
addDataLocation(fragment);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 47f1af2..1cba931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -128,7 +128,8 @@ public class TaskAttemptContext {
@VisibleForTesting
public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId,
final Fragment [] fragments, final Path workDir) {
- this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(queryContext.getConf(), fragments),
+ workDir);
}
public TajoConf getConf() {
@@ -258,12 +259,12 @@ public class TaskAttemptContext {
public void updateAssignedFragments(String tableId, Fragment[] fragments) {
fragmentMap.remove(tableId);
for(Fragment t : fragments) {
- if (fragmentMap.containsKey(t.getTableName())) {
- fragmentMap.get(t.getTableName()).add(t.getProto());
+ if (fragmentMap.containsKey(t.getInputSourceId())) {
+ fragmentMap.get(t.getInputSourceId()).add(FragmentConvertor.toFragmentProto(getConf(), t));
} else {
List<FragmentProto> frags = new ArrayList<>();
- frags.add(t.getProto());
- fragmentMap.put(t.getTableName(), frags);
+ frags.add(FragmentConvertor.toFragmentProto(getConf(), t));
+ fragmentMap.put(t.getInputSourceId(), frags);
}
}
}
@@ -281,7 +282,7 @@ public class TaskAttemptContext {
List<Path> paths = fragmentToPath(tableFragments);
for (FragmentProto eachFragment: fragments) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+ FileFragment fileFragment = FragmentConvertor.convert(getConf(), eachFragment);
// If current attempt already has same path, we don't need to add it to fragments.
if (!paths.contains(fileFragment.getPath())) {
tableFragments.add(eachFragment);
@@ -297,7 +298,7 @@ public class TaskAttemptContext {
List<Path> list = new ArrayList<>();
for (FragmentProto proto : tableFragments) {
- FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ FileFragment fragment = FragmentConvertor.convert(getConf(), proto);
list.add(fragment.getPath());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 4afb383..4fcd5dc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -244,6 +244,21 @@ public abstract class Tablespace {
}
/**
+ * Returns Scanner instance.
+ *
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws IOException
+ */
+ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, Fragment fragment,
+ Schema target) throws IOException {
+ return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+ }
+
+ /**
* Returns Appender instance.
* @param queryContext Query property.
* @param taskAttemptId Task id.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
new file mode 100644
index 0000000..9c4fce5
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+public class BuiltinFragmentKinds {
+ public static final String FILE = "FILE";
+ public static final String HBASE = "HBASE";
+ public static final String JDBC = "JDBC";
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
index ac43197..a8de4ab 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -18,22 +18,150 @@
package org.apache.tajo.storage.fragment;
-import org.apache.tajo.common.ProtoObject;
+import com.google.common.collect.ImmutableList;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import java.net.URI;
-public interface Fragment extends ProtoObject<FragmentProto> {
+/**
+ * The Fragment is similar to the split in MapReduce.
+ * For distributed processing of a a single large table,
+ * it contains the information of which part of data will be processed by each task.
+ *
+ * @param <T> type of fragment key. It should implement the Comparable interface.
+ */
+public abstract class Fragment<T extends Comparable> implements Comparable<Fragment<T>>, Cloneable {
- public abstract String getTableName();
+ protected String kind;
+ protected URI uri;
+ protected String inputSourceId;
+ protected T startKey;
+ protected T endKey;
+ protected long length;
+ protected ImmutableList<String> hostNames;
- @Override
- public abstract FragmentProto getProto();
+ protected Fragment(String kind,
+ URI uri,
+ String inputSourceId,
+ T startKey,
+ T endKey,
+ long length,
+ String[] hostNames) {
+ this.kind = kind;
+ this.uri = uri;
+ this.inputSourceId = inputSourceId;
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.length = length;
+ this.hostNames = hostNames == null ? ImmutableList.of() : ImmutableList.copyOf(hostNames);
+ }
+
+ /**
+ * Returns the fragment type.
+ *
+ * @return fragment type
+ */
+ public final String getKind() {
+ return kind;
+ }
+
+ /**
+ * Returns an unique URI of the input source.
+ *
+ * @return URI of the input source
+ */
+ public final URI getUri() {
+ return uri;
+ }
- public abstract long getLength();
+ /**
+ * Returns a unique id of the input source.
+ *
+ * @return id of the input source
+ */
+ public final String getInputSourceId() {
+ return this.inputSourceId;
+ }
- public abstract String getKey();
+ /**
+ * Returns the start key of the data range.
+ * {@link org.apache.tajo.storage.Scanner} will start reading data from the point indicated by this key.
+ *
+ * @return start key
+ */
+ public final T getStartKey() {
+ return startKey;
+ }
- public String[] getHosts();
+ /**
+ * Returns the end key of the data range.
+ * {@link org.apache.tajo.storage.Scanner} will stop reading data when it reaches the point indicated by this key.
+ *
+ * @return end key
+ */
+ public final T getEndKey() {
+ return endKey;
+ }
- public abstract boolean isEmpty();
+ /**
+ * Returns the length of the data range between start key and end key.
+ *
+ * @return length of the range
+ */
+ public final long getLength() {
+ return length;
+ }
+
+ /**
+ * Returns host names which have any portion of the data between start key and end key.
+ *
+ * @return host names
+ */
+ public final ImmutableList<String> getHostNames() {
+ return hostNames;
+ }
+
+ /**
+ * Indicates the fragment is empty or not.
+ * An empty fragment means that there is no data to read.
+ *
+ * @return true if the fragment is empty. Otherwise, false.
+ */
+ public boolean isEmpty() {
+ return length == 0;
+ }
+
+ /**
+ * First compares URIs of fragments, and then compares their start keys.
+ *
+ * @param t
+ * @return return 0 if two fragments are same. If not same, return -1 if this fragment is smaller than the other.
+ * Otherwise, return 1;
+ */
+ @Override
+ public final int compareTo(Fragment<T> t) {
+ int cmp = uri.compareTo(t.uri);
+ if (cmp == 0) {
+ if (startKey != null && t.startKey != null) {
+ return startKey.compareTo(t.startKey);
+ } else if (startKey == null) { // nulls last
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ return cmp;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ Fragment clone = (Fragment) super.clone();
+ clone.uri = this.uri;
+ clone.inputSourceId = this.inputSourceId;
+ clone.startKey = this.startKey;
+ clone.endKey = this.endKey;
+ clone.hostNames = this.hostNames;
+ clone.length = this.length;
+ return clone;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
index 4ce6928..835a714 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -20,13 +20,12 @@ package org.apache.tajo.storage.fragment;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.exception.TajoInternalError;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
@@ -34,95 +33,81 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@ThreadSafe
public class FragmentConvertor {
- /**
- * Cache of fragment classes
- */
- protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
- /**
- * Cache of constructors for each class.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
- /**
- * default parameter for all constructors
- */
- private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
- public static Class<? extends Fragment> getFragmentClass(Configuration conf, String dataFormat) {
- Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(dataFormat.toLowerCase());
- if (fragmentClass == null) {
- fragmentClass = conf.getClass(
- String.format("tajo.storage.fragment.%s.class", dataFormat.toLowerCase()), null, Fragment.class);
- CACHED_FRAGMENT_CLASSES.put(dataFormat.toLowerCase(), fragmentClass);
+ private static final Map<String, FragmentSerde> SERDE_MAP = Maps.newConcurrentMap();
+
+ private static FragmentSerde getFragmentSerde(Configuration conf, String fragmentKind) {
+ fragmentKind = fragmentKind.toLowerCase();
+ FragmentSerde serde = SERDE_MAP.get(fragmentKind);
+ if (serde == null) {
+ Class<? extends FragmentSerde> serdeClass = conf.getClass(
+ String.format("tajo.storage.fragment.serde.%s", fragmentKind), null, FragmentSerde.class);
+ try {
+ serde = serdeClass.getConstructor(null).newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new TajoInternalError(e);
+ }
+ SERDE_MAP.put(fragmentKind, serde);
}
- if (fragmentClass == null) {
- throw new TajoInternalError("No such a fragment for " + dataFormat.toLowerCase());
+ if (serde == null) {
+ throw new TajoInternalError("No such a serde for " + fragmentKind);
}
- return fragmentClass;
+ return serde;
}
- public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
- T result;
+ public static <T extends Fragment> T convert(Configuration conf, String fragmentKind, FragmentProto fragment) {
+ FragmentSerde serde = getFragmentSerde(conf, fragmentKind);
try {
- Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
- if (constructor == null) {
- constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(clazz, constructor);
- }
- result = constructor.newInstance(new Object[]{fragment.getContents()});
- } catch (Throwable e) {
+ return (T) serde.deserialize(
+ serde.newBuilder()
+ .mergeFrom(fragment.getContents())
+ .build());
+ } catch (InvalidProtocolBufferException e) {
throw new TajoInternalError(e);
}
-
- return result;
}
public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) {
- Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getDataFormat().toLowerCase());
- if (fragmentClass == null) {
- throw new TajoInternalError("No such a fragment class for " + fragment.getDataFormat());
- }
- return convert(fragmentClass, fragment);
+ return convert(conf, fragment.getKind(), fragment);
}
- public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
- throws IOException {
+ public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) {
List<T> list = Lists.newArrayList();
if (fragments == null) {
return list;
}
for (FragmentProto proto : fragments) {
- list.add(convert(clazz, proto));
+ list.add(convert(conf, proto));
}
return list;
}
- public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) {
- List<T> list = Lists.newArrayList();
- if (fragments == null) {
- return list;
- }
- for (FragmentProto proto : fragments) {
- list.add((T) convert(conf, proto));
- }
- return list;
+ public static FragmentProto toFragmentProto(Configuration conf, Fragment fragment) {
+ FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+ fragmentBuilder.setId(fragment.getInputSourceId());
+ fragmentBuilder.setKind(fragment.getKind());
+ fragmentBuilder.setContents(getFragmentSerde(conf, fragment.getKind()).serialize(fragment).toByteString());
+ return fragmentBuilder.build();
}
- public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
+ public static List<FragmentProto> toFragmentProtoList(Configuration conf, Fragment... fragments) {
List<FragmentProto> list = Lists.newArrayList();
if (fragments == null) {
return list;
}
for (Fragment fragment : fragments) {
- list.add(fragment.getProto());
+ list.add(toFragmentProto(conf, fragment));
}
return list;
}
- public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
- List<FragmentProto> list = toFragmentProtoList(fragments);
+ public static FragmentProto [] toFragmentProtoArray(Configuration conf, Fragment... fragments) {
+ List<FragmentProto> list = toFragmentProtoList(conf, fragments);
return list.toArray(new FragmentProto[list.size()]);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
new file mode 100644
index 0000000..c570c9c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.GeneratedMessage.Builder;
+
+/**
+ * FragmentSerde abstracts how a fragment is serialized / deserialized to / from a Protocol Buffer message.
+ *
+ * @param <F> Fragment class
+ * @param <P> Protocol Buffer Message class corresponding to the Fragment class
+ */
+public interface FragmentSerde<F extends Fragment, P extends Message> {
+
+ /**
+ * Creates a new builder of {@link P}.
+ *
+ * @return a Protocol Buffer message builder
+ */
+ Builder newBuilder();
+
+ /**
+ * Serializes a fragment into a Protocol Buffer message.
+ *
+ * @param fragment
+ * @return a serialized Protocol Buffer message
+ */
+ P serialize(F fragment);
+
+ /**
+ * Deserializes a Protocol Buffer message to a fragment.
+ *
+ * @param proto
+ * @return a deserialized fragment instance
+ */
+ F deserialize(P proto);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 2454714..4e57204 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -44,52 +44,28 @@
<!--- Fragment Class Configurations -->
<property>
- <name>tajo.storage.fragment.text.class</name>
+ <name>tajo.storage.fragment.kind.file</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.draw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.orc.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.file</name>
+ <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ <name>tajo.storage.fragment.serde.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.jdbc.class</name>
- <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+ <name>tajo.storage.fragment.serde.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
</property>
<!--- Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 1c4530a..5bf6b0b 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -43,52 +43,28 @@
<!--- Fragment Class Configurations -->
<property>
- <name>tajo.storage.fragment.text.class</name>
+ <name>tajo.storage.fragment.kind.file</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.draw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.orc.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.kind.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
</property>
<property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ <name>tajo.storage.fragment.serde.file</name>
+ <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ <name>tajo.storage.fragment.serde.hbase</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
</property>
<property>
- <name>tajo.storage.fragment.jdbc.class</name>
- <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
+ <name>tajo.storage.fragment.serde.jdbc</name>
+ <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
</property>
<!--- Scanner Handler -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
index 18aa515..e1026bb 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -19,112 +19,49 @@
package org.apache.tajo.storage.hbase;
import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+import org.apache.tajo.storage.hbase.HBaseFragment.HBaseFragmentKey;
import java.net.URI;
-public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
- @Expose
- private URI uri;
- @Expose
- private String tableName;
- @Expose
+/**
+ * Fragment for HBase
+ */
+public class HBaseFragment extends Fragment<HBaseFragmentKey> {
private String hbaseTableName;
- @Expose
- private byte[] startRow;
- @Expose
- private byte[] stopRow;
- @Expose
- private String regionLocation;
- @Expose
private boolean last;
- @Expose
- private long length;
public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
String regionLocation) {
- this.uri = uri;
- this.tableName = tableName;
+ super(BuiltinFragmentKinds.HBASE, uri, tableName, new HBaseFragmentKey(startRow), new HBaseFragmentKey(stopRow),
+ TajoConstants.UNKNOWN_LENGTH, new String[]{regionLocation});
+
this.hbaseTableName = hbaseTableName;
- this.startRow = startRow;
- this.stopRow = stopRow;
- this.regionLocation = regionLocation;
this.last = false;
}
- public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder.mergeFrom(raw);
- builder.build();
- init(builder.build());
- }
-
- private void init(HBaseFragmentProto proto) {
- this.uri = URI.create(proto.getUri());
- this.tableName = proto.getTableName();
- this.hbaseTableName = proto.getHbaseTableName();
- this.startRow = proto.getStartRow().toByteArray();
- this.stopRow = proto.getStopRow().toByteArray();
- this.regionLocation = proto.getRegionLocation();
- this.length = proto.getLength();
- this.last = proto.getLast();
- }
-
- @Override
- public int compareTo(HBaseFragment t) {
- return Bytes.compareTo(startRow, t.startRow);
- }
-
- public URI getUri() {
- return uri;
- }
-
- @Override
- public String getTableName() {
- return tableName;
- }
-
- @Override
- public String getKey() {
- return new String(startRow);
+ public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
+ String regionLocation, boolean last) {
+ this(uri, tableName, hbaseTableName, startRow, stopRow, regionLocation);
+ this.last = last;
}
@Override
public boolean isEmpty() {
- return startRow == null || stopRow == null;
- }
-
- @Override
- public long getLength() {
- return length;
+ return startKey.isEmpty() || endKey.isEmpty();
}
public void setLength(long length) {
this.length = length;
}
- @Override
- public String[] getHosts() {
- return new String[] {regionLocation};
- }
-
public Object clone() throws CloneNotSupportedException {
HBaseFragment frag = (HBaseFragment) super.clone();
- frag.uri = uri;
- frag.tableName = tableName;
frag.hbaseTableName = hbaseTableName;
- frag.startRow = startRow;
- frag.stopRow = stopRow;
- frag.regionLocation = regionLocation;
frag.last = last;
- frag.length = length;
return frag;
}
@@ -132,9 +69,9 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
public boolean equals(Object o) {
if (o instanceof HBaseFragment) {
HBaseFragment t = (HBaseFragment) o;
- if (tableName.equals(t.tableName)
- && Bytes.equals(startRow, t.startRow)
- && Bytes.equals(stopRow, t.stopRow)) {
+ if (inputSourceId.equals(t.inputSourceId)
+ && startKey.equals(t.startKey)
+ && endKey.equals(t.endKey)) {
return true;
}
}
@@ -143,51 +80,19 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
@Override
public int hashCode() {
- return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
+ return Objects.hashCode(inputSourceId, hbaseTableName, startKey, endKey);
}
@Override
public String toString() {
return
- "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName +
+ "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ inputSourceId +
"\", hbaseTableName\": \"" + hbaseTableName + "\"" +
- ", \"startRow\": \"" + new String(startRow) + "\"" +
- ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+ ", \"startRow\": \"" + new String(startKey.bytes) + "\"" +
+ ", \"stopRow\": \"" + new String(endKey.bytes) + "\"" +
", \"length\": \"" + length + "\"}" ;
}
- @Override
- public FragmentProto getProto() {
- HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder
- .setUri(uri.toString())
- .setTableName(tableName)
- .setHbaseTableName(hbaseTableName)
- .setStartRow(ByteString.copyFrom(startRow))
- .setStopRow(ByteString.copyFrom(stopRow))
- .setLast(last)
- .setLength(length)
- .setRegionLocation(regionLocation);
-
- FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
- fragmentBuilder.setId(this.tableName);
- fragmentBuilder.setContents(builder.buildPartial().toByteString());
- fragmentBuilder.setDataFormat(BuiltinStorages.HBASE);
- return fragmentBuilder.build();
- }
-
- public byte[] getStartRow() {
- return startRow;
- }
-
- public byte[] getStopRow() {
- return stopRow;
- }
-
- public String getRegionLocation() {
- return regionLocation;
- }
-
public boolean isLast() {
return last;
}
@@ -200,15 +105,51 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
return hbaseTableName;
}
- public void setHbaseTableName(String hbaseTableName) {
- this.hbaseTableName = hbaseTableName;
- }
-
public void setStartRow(byte[] startRow) {
- this.startRow = startRow;
+ this.startKey = new HBaseFragmentKey(startRow);
}
public void setStopRow(byte[] stopRow) {
- this.stopRow = stopRow;
+ this.endKey = new HBaseFragmentKey(stopRow);
+ }
+
+ public static class HBaseFragmentKey implements Comparable<HBaseFragmentKey> {
+ private final byte[] bytes;
+
+ public HBaseFragmentKey(byte[] key) {
+ this.bytes = key;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof HBaseFragmentKey) {
+ HBaseFragmentKey other = (HBaseFragmentKey) o;
+ return Bytes.equals(bytes, other.bytes);
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(HBaseFragmentKey o) {
+ return Bytes.compareTo(bytes, o.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return new String(bytes);
+ }
+
+ public boolean isEmpty() {
+ return this.bytes == null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
new file mode 100644
index 0000000..f896f43
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+
+import java.net.URI;
+
+public class HBaseFragmentSerde implements FragmentSerde<HBaseFragment, HBaseFragmentProto> {
+
+ @Override
+ public Builder newBuilder() {
+ return HBaseFragmentProto.newBuilder();
+ }
+
+ @Override
+ public HBaseFragmentProto serialize(HBaseFragment fragment) {
+ return HBaseFragmentProto.newBuilder()
+ .setUri(fragment.getUri().toASCIIString())
+ .setTableName(fragment.getInputSourceId())
+ .setHbaseTableName(fragment.getHbaseTableName())
+ .setStartRow(ByteString.copyFrom(fragment.getStartKey().getBytes()))
+ .setStopRow(ByteString.copyFrom(fragment.getEndKey().getBytes()))
+ .setLast(fragment.isLast())
+ .setLength(fragment.getLength())
+ .setRegionLocation(fragment.getHostNames().get(0))
+ .build();
+ }
+
+ @Override
+ public HBaseFragment deserialize(HBaseFragmentProto proto) {
+ return new HBaseFragment(
+ URI.create(proto.getUri()),
+ proto.getTableName(),
+ proto.getHbaseTableName(),
+ proto.getStartRow().toByteArray(),
+ proto.getStopRow().toByteArray(),
+ proto.getRegionLocation(),
+ proto.getLast());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index b2ca02d..781f911 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,10 +36,16 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.exception.*;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
@@ -175,16 +181,16 @@ public class HBaseScanner implements Scanner {
}
}
- scan.setStartRow(fragment.getStartRow());
- if (fragment.isLast() && fragment.getStopRow() != null &&
- fragment.getStopRow().length > 0) {
+ scan.setStartRow(fragment.getStartKey().getBytes());
+ if (fragment.isLast() && !fragment.getEndKey().isEmpty() &&
+ fragment.getEndKey().getBytes().length > 0) {
// last and stopRow is not empty
if (filters == null) {
filters = new FilterList();
}
- filters.addFilter(new InclusiveStopFilter(fragment.getStopRow()));
+ filters.addFilter(new InclusiveStopFilter(fragment.getEndKey().getBytes()));
} else {
- scan.setStopRow(fragment.getStopRow());
+ scan.setStopRow(fragment.getEndKey().getBytes());
}
if (filters != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 0cf883e..e3f7c25 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -525,10 +525,10 @@ public class HBaseTablespace extends Tablespace {
if (fragmentMap.containsKey(regionStartKey)) {
final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
- if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+ if (Bytes.compareTo(fragmentStart, prevFragment.getStartKey().getBytes()) < 0) {
prevFragment.setStartRow(fragmentStart);
}
- if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+ if (Bytes.compareTo(fragmentStop, prevFragment.getEndKey().getBytes()) > 0) {
prevFragment.setStopRow(fragmentStop);
}
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index f3cb9a5..17c413e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -453,8 +453,8 @@ public class FileTablespace extends Tablespace {
/**
* Get Disk Ids by Volume Bytes
*/
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
+ private Integer[] getDiskIds(VolumeId[] volumeIds) {
+ Integer[] diskIds = new Integer[volumeIds.length];
for (int i = 0; i < volumeIds.length; i++) {
int diskId = -1;
if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c44272b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 6d8443e..a6850c1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -92,7 +92,7 @@ public class RawFile {
fis = new FileInputStream(file);
channel = fis.getChannel();
filePosition = startOffset = fragment.getStartKey();
- endOffset = fragment.getStartKey() + fragment.getLength();
+ endOffset = fragment.getEndKey();
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()