You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/29 11:25:51 UTC
[3/3] git commit: TAJO-287: Improve Fragment to be more generic.
(hyunsik)
TAJO-287: Improve Fragment to be more generic. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3c22d3eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3c22d3eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3c22d3eb
Branch: refs/heads/master
Commit: 3c22d3eba437e9130ee74de8f935c7fc330b1217
Parents: 2f09450
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 28 21:46:34 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Oct 29 19:20:00 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/catalog/json/TableMetaAdapter.java | 1 +
.../src/main/proto/CatalogProtos.proto | 5 +
.../org/apache/tajo/client/ResultSetImpl.java | 12 +-
.../engine/planner/PhysicalPlannerImpl.java | 30 ++-
.../planner/physical/BSTIndexScanExec.java | 3 +-
.../planner/physical/ExternalSortExec.java | 10 +-
.../engine/planner/physical/SeqScanExec.java | 13 +-
.../tajo/engine/query/QueryUnitRequest.java | 6 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 20 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 8 +-
.../tajo/master/event/SubQuerySucceeEvent.java | 2 +
.../tajo/master/querymaster/QueryUnit.java | 26 +-
.../tajo/master/querymaster/Repartitioner.java | 24 +-
.../tajo/master/querymaster/SubQuery.java | 8 +-
.../java/org/apache/tajo/util/IndexUtil.java | 6 +-
.../main/java/org/apache/tajo/worker/Task.java | 47 +---
.../apache/tajo/worker/TaskAttemptContext.java | 44 ++--
.../planner/physical/TestBNLJoinExec.java | 13 +-
.../planner/physical/TestBSTIndexExec.java | 12 +-
.../planner/physical/TestExternalSortExec.java | 5 +-
.../physical/TestFullOuterHashJoinExec.java | 25 +-
.../physical/TestFullOuterMergeJoinExec.java | 37 +--
.../planner/physical/TestHashAntiJoinExec.java | 7 +-
.../planner/physical/TestHashJoinExec.java | 7 +-
.../planner/physical/TestHashSemiJoinExec.java | 7 +-
.../physical/TestLeftOuterHashJoinExec.java | 31 +--
.../physical/TestLeftOuterNLJoinExec.java | 36 +--
.../planner/physical/TestMergeJoinExec.java | 7 +-
.../engine/planner/physical/TestNLJoinExec.java | 13 +-
.../planner/physical/TestPhysicalPlanner.java | 95 +++----
.../physical/TestRightOuterHashJoinExec.java | 24 +-
.../physical/TestRightOuterMergeJoinExec.java | 37 +--
.../engine/planner/physical/TestSortExec.java | 5 +-
.../apache/tajo/storage/TestFileFragment.java | 97 +++++++
.../org/apache/tajo/storage/TestFragment.java | 100 --------
.../org/apache/tajo/storage/TestRowFile.java | 7 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 9 +-
.../tajo/storage/AbstractStorageManager.java | 116 +++++----
.../java/org/apache/tajo/storage/CSVFile.java | 17 +-
.../org/apache/tajo/storage/FileAppender.java | 2 +-
.../org/apache/tajo/storage/FileScanner.java | 5 +-
.../java/org/apache/tajo/storage/Fragment.java | 249 ------------------
.../org/apache/tajo/storage/MergeScanner.java | 23 +-
.../java/org/apache/tajo/storage/RawFile.java | 13 +-
.../java/org/apache/tajo/storage/RowFile.java | 15 +-
.../java/org/apache/tajo/storage/Storage.java | 3 +-
.../org/apache/tajo/storage/StorageManager.java | 3 +-
.../tajo/storage/StorageManagerFactory.java | 5 +-
.../tajo/storage/fragment/FileFragment.java | 250 +++++++++++++++++++
.../apache/tajo/storage/fragment/Fragment.java | 31 +++
.../storage/fragment/FragmentConvertor.java | 123 +++++++++
.../tajo/storage/rcfile/RCFileWrapper.java | 19 +-
.../tajo/storage/trevni/TrevniAppender.java | 4 +-
.../tajo/storage/trevni/TrevniScanner.java | 6 +-
.../apache/tajo/storage/v2/CSVFileScanner.java | 17 +-
.../apache/tajo/storage/v2/FileScannerV2.java | 14 +-
.../apache/tajo/storage/v2/RCFileScanner.java | 14 +-
.../tajo/storage/v2/StorageManagerV2.java | 4 +-
.../src/main/resources/storage-default.xml | 26 +-
.../tajo/storage/TestCompressionStorages.java | 11 +-
.../apache/tajo/storage/TestMergeScanner.java | 9 +-
.../apache/tajo/storage/TestStorageManager.java | 2 +-
.../org/apache/tajo/storage/TestStorages.java | 11 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 65 ++---
.../index/TestSingleCSVFileBSTIndex.java | 13 +-
.../apache/tajo/storage/rcfile/TestRCFile.java | 2 +-
.../tajo/storage/v2/TestCSVCompression.java | 11 +-
.../apache/tajo/storage/v2/TestCSVScanner.java | 5 +-
.../apache/tajo/storage/v2/TestStorages.java | 11 +-
.../src/test/resources/storage-default.xml | 37 ++-
71 files changed, 1117 insertions(+), 860 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3f309cc..9f14a4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-287: Improve Fragment to be more generic. (hyunsik)
+
TAJO-274: Maintaining connectivity to Tajo master regardless of the restart
of the Tajo master. (Keuntae Park via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
index a3337c8..ce42bea 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/json/TableMetaAdapter.java
@@ -20,6 +20,7 @@ package org.apache.tajo.catalog.json;
import com.google.gson.*;
import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.json.GsonSerDerAdapter;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 3de5409..9c57bf3 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -82,6 +82,11 @@ message KeyValueSetProto {
}
message FragmentProto {
+ required string id = 1;
+ required bytes contents = 2;
+}
+
+message FileFragmentProto {
required string id = 1;
required string path = 2;
required int64 startOffset = 3;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
index b713f71..bfe0cdc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/ResultSetImpl.java
@@ -31,7 +31,7 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.MergeScanner;
import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.Tuple;
@@ -72,8 +72,8 @@ public class ResultSetImpl implements ResultSet {
if(desc != null) {
fs = desc.getPath().getFileSystem(conf);
this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
- Collection<Fragment> frags = getFragments(desc.getMeta(), desc.getPath());
- scanner = new MergeScanner(conf, desc.getMeta(), schema, frags);
+ Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+ scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
}
init();
}
@@ -91,9 +91,9 @@ public class ResultSetImpl implements ResultSet {
}
}
- private Collection<Fragment> getFragments(TableMeta meta, Path tablePath)
+ private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
throws IOException {
- List<Fragment> fraglist = Lists.newArrayList();
+ List<FileFragment> fraglist = Lists.newArrayList();
FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -107,7 +107,7 @@ public class ResultSetImpl implements ResultSet {
if (files[i].getLen() == 0) {
continue;
}
- fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+ fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
}
return fraglist;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 4a3940c..db58e32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -27,6 +27,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
@@ -37,14 +40,15 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
@@ -177,12 +181,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) {
+ private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
long size = 0;
for (String tableId : tableIds) {
- Fragment[] fragments = ctx.getTables(tableId);
- for (Fragment frag : fragments) {
- size += frag.getLength();
+ // TODO - CSV is a hack.
+ List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV,
+ ctx.getTables(tableId));
+ for (FileFragment frag : fragments) {
+ size += frag.getEndKey();
}
}
return size;
@@ -636,7 +642,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
- Fragment[] fragments = ctx.getTables(scanNode.getCanonicalName());
+ FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
@@ -751,17 +757,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
- Fragment[] fragments = ctx.getTables(annotation.getTableName());
+ FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+ List<FileFragment> fragments =
+ FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
- String indexName = IndexUtil.getIndexNameOfFrag(fragments[0],
- annotation.getSortKeys());
+ String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
annotation.getSortKeys());
- return new BSTIndexScanExec(ctx, sm, annotation, fragments[0], new Path(
- indexPath, indexName), annotation.getKeySchema(), comp,
- annotation.getDatum());
+ return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
+ annotation.getKeySchema(), comp, annotation.getDatum());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 2d81180..5de6634 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
@@ -48,7 +49,7 @@ public class BSTIndexScanExec extends PhysicalExec {
public BSTIndexScanExec(TaskAttemptContext context,
AbstractStorageManager sm , ScanNode scanNode ,
- Fragment fragment, Path fileName , Schema keySchema,
+ FileFragment fragment, Path fileName , Schema keySchema,
TupleComparator comparator , Datum[] datum) throws IOException {
super(context, scanNode.getInSchema(), scanNode.getOutSchema());
this.scanNode = scanNode;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 99df92e..0a8cb62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -75,7 +75,7 @@ public class ExternalSortExec extends SortExec {
// So, I add the scheme 'file:/' to path. But, it should be improved.
Path localPath = new Path(sortTmpDir + "/0_" + chunkId);
- appender = new RawFile.RawFileAppender(context.getConf(), meta, inSchema, localPath);
+ appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, localPath);
appender.init();
for (Tuple t : tupleSlots) {
@@ -147,7 +147,7 @@ public class ExternalSortExec extends SortExec {
Path leftChunk = getChunkPath(level, chunkId);
Path rightChunk = getChunkPath(level, chunkId + 1);
- appender = new RawFile.RawFileAppender(context.getConf(), meta, inSchema, nextChunk);
+ appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, nextChunk);
appender.init();
merge(appender, leftChunk, rightChunk);
@@ -167,7 +167,7 @@ public class ExternalSortExec extends SortExec {
}
Path result = getChunkPath(level, 0);
- this.result = new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), result);
+ this.result = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, result);
sorted = true;
}
@@ -176,10 +176,10 @@ public class ExternalSortExec extends SortExec {
private void merge(RawFile.RawFileAppender appender, Path left, Path right)
throws IOException {
- RawFile.RawFileScanner leftScan = new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), left);
+ RawFile.RawFileScanner leftScan = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, left);
RawFile.RawFileScanner rightScan =
- new RawFile.RawFileScanner(context.getConf(), meta, plan.getInSchema(), right);
+ new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, right);
Tuple leftTuple = leftScan.next();
Tuple rightTuple = rightScan.next();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 2045602..22af472 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,6 +18,9 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -28,7 +31,6 @@ import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.*;
-import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.util.HashSet;
@@ -41,13 +43,13 @@ public class SeqScanExec extends PhysicalExec {
private EvalNode qual = null;
private EvalContext qualCtx;
- private Fragment [] fragments;
+ private CatalogProtos.FragmentProto [] fragments;
private Projector projector;
private EvalContext [] evalContexts;
public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
- ScanNode plan, Fragment [] fragments) throws IOException {
+ ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
this.plan = plan;
@@ -88,8 +90,9 @@ public class SeqScanExec extends PhysicalExec {
this.evalContexts = projector.renew();
if (fragments.length > 1) {
- this.scanner = new MergeScanner(context.getConf(), plan.getTableDesc().getMeta(), plan.getTableSchema(),
- TUtil.newList(fragments));
+ this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
+ FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
+ fragments));
} else {
this.scanner = StorageManagerFactory.getStorageManager(
context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getTableSchema(), fragments[0], projected);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
index edc5f3d..383a787 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -21,12 +21,12 @@
*/
package org.apache.tajo.engine.query;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.storage.Fragment;
import java.net.URI;
import java.util.List;
@@ -34,7 +34,7 @@ import java.util.List;
public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
public QueryUnitAttemptId getId();
- public List<Fragment> getFragments();
+ public List<CatalogProtos.FragmentProto> getFragments();
public String getOutputTableId();
public boolean isClusteredOutput();
public String getSerializedData();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index f9cc550..3c3c3dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -18,22 +18,24 @@
package org.apache.tajo.engine.query;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
public class QueryUnitRequestImpl implements QueryUnitRequest {
private QueryUnitAttemptId id;
- private List<Fragment> fragments;
+ private List<FragmentProto> fragments;
private String outputTable;
private boolean isUpdated;
private boolean clusteredOutput;
@@ -55,7 +57,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.isUpdated = false;
}
- public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
+ public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments,
String outputTable, boolean clusteredOutput,
String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
this();
@@ -69,7 +71,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
isUpdated = false;
}
- public void set(QueryUnitAttemptId id, List<Fragment> fragments,
+ public void set(QueryUnitAttemptId id, List<FragmentProto> fragments,
String outputTable, boolean clusteredOutput,
String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
this.id = id;
@@ -106,16 +108,16 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
}
@Override
- public List<Fragment> getFragments() {
+ public List<FragmentProto> getFragments() {
QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
if (fragments != null) {
return fragments;
}
if (fragments == null) {
- fragments = new ArrayList<Fragment>();
+ fragments = new ArrayList<FragmentProto>();
}
for (int i = 0; i < p.getFragmentsCount(); i++) {
- fragments.add(new Fragment(p.getFragments(i)));
+ fragments.add(p.getFragments(i));
}
return this.fragments;
}
@@ -284,7 +286,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
}
if (fragments != null) {
for (int i = 0; i < fragments.size(); i++) {
- builder.addFragments(fragments.get(i).getProto());
+ builder.addFragments(fragments.get(i));
}
}
if (this.outputTable != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index e09dd69..1d53c87 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -41,7 +42,8 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
import java.net.URI;
@@ -49,6 +51,8 @@ import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
public class TaskSchedulerImpl extends AbstractService
implements TaskScheduler {
private static final Log LOG = LogFactory.getLog(TaskScheduleEvent.class);
@@ -461,7 +465,7 @@ public class TaskSchedulerImpl extends AbstractService
QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
- new ArrayList<Fragment>(task.getAllFragments()),
+ new ArrayList<FragmentProto>(task.getAllFragments()),
"",
false,
task.getLogicalPlan().toJson(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index 5262a07..2485421 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -19,6 +19,8 @@
package org.apache.tajo.master.event;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.master.querymaster.SubQueryState;
public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index ff0d120..5c6f79f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -28,12 +28,14 @@ import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.QueryUnitId;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.TajoIdUtils;
import java.net.URI;
@@ -44,6 +46,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
public class QueryUnit implements EventHandler<TaskEvent> {
/** Class Logger */
private static final Log LOG = LogFactory.getLog(QueryUnit.class);
@@ -54,7 +58,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private LogicalNode plan = null;
private List<ScanNode> scan;
- private Map<String, Fragment> fragMap;
+ private Map<String, FragmentProto> fragMap;
private Map<String, Set<URI>> fetchMap;
private List<Partition> partitions;
@@ -130,7 +134,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.isLeafTask;
}
- public void setDataLocations(Fragment fragment) {
+ public void setDataLocations(FileFragment fragment) {
String[] hosts = fragment.getHosts();
int[] blockCount = fragment.getHostsBlockCount();
int[] volumeIds = fragment.getDiskIds();
@@ -176,13 +180,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
@Deprecated
- public void setFragment(String tableId, Fragment fragment) {
- this.fragMap.put(tableId, fragment);
+ public void setFragment(String tableId, FileFragment fragment) {
+ this.fragMap.put(tableId, fragment.getProto());
setDataLocations(fragment);
}
- public void setFragment2(Fragment fragment) {
- this.fragMap.put(fragment.getName(), fragment);
+ public void setFragment2(FileFragment fragment) {
+ this.fragMap.put(fragment.getTableName(), fragment.getProto());
setDataLocations(fragment);
}
@@ -220,12 +224,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
this.fetchMap.clear();
this.fetchMap.putAll(fetches);
}
-
- public Fragment getFragment(String tableId) {
- return this.fragMap.get(tableId);
- }
- public Collection<Fragment> getAllFragments() {
+ public Collection<FragmentProto> getAllFragments() {
return fragMap.values();
}
@@ -268,7 +268,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public String toString() {
String str = new String(plan.getType() + " \n");
- for (Entry<String, Fragment> e : fragMap.entrySet()) {
+ for (Entry<String, FragmentProto> e : fragMap.entrySet()) {
str += e.getKey() + " : ";
str += e.getValue() + " ";
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index d0cca20..ef14c31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -40,7 +40,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
@@ -75,7 +75,7 @@ public class Repartitioner {
ScanNode[] scans = execBlock.getScanNodes();
Path tablePath;
- Fragment [] fragments = new Fragment[2];
+ FileFragment[] fragments = new FileFragment[2];
TableStats[] stats = new TableStats[2];
// initialize variables from the child operators
@@ -89,7 +89,7 @@ public class Repartitioner {
tablePath = storageManager.getTablePath(scans[i].getTableName());
stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
- fragments[i] = new Fragment(scans[i].getCanonicalName(), tablePath, 0, 0);
+ fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0);
} else {
tablePath = tableDesc.getPath();
stats[i] = tableDesc.getStats();
@@ -198,7 +198,7 @@ public class Repartitioner {
return tasks;
}
- private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, Fragment broadcasted) throws IOException {
+ private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, FileFragment broadcasted) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
@@ -210,13 +210,13 @@ public class Repartitioner {
meta = desc.getMeta();
FileSystem fs = inputPath.getFileSystem(subQuery.getContext().getConf());
- List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+ List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
inputPath);
QueryUnit queryUnit;
List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
int i = 0;
- for (Fragment fragment : fragments) {
+ for (FileFragment fragment : fragments) {
queryUnit = newQueryUnit(subQuery, i++, fragment);
queryUnit.setFragment2(broadcasted);
queryUnits.add(queryUnit);
@@ -224,7 +224,7 @@ public class Repartitioner {
return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
}
- private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+ private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit unit = new QueryUnit(
QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
@@ -234,7 +234,7 @@ public class Repartitioner {
return unit;
}
- private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+ private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, FileFragment[] fragments, int taskNum) {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks = new QueryUnit[taskNum];
for (int i = 0; i < taskNum; i++) {
@@ -242,7 +242,7 @@ public class Repartitioner {
QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
subQuery.getEventHandler());
tasks[i].setLogicalPlan(execBlock.getPlan());
- for (Fragment fragment : fragments) {
+ for (FileFragment fragment : fragments) {
tasks[i].setFragment2(fragment);
}
}
@@ -342,7 +342,7 @@ public class Repartitioner {
" sub ranges (total units: " + determinedTaskNum + ")");
TupleRange [] ranges = partitioner.partition(determinedTaskNum);
- Fragment dummyFragment = new Fragment(scan.getTableName(), tablePath, 0, 0);
+ FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0);
List<String> basicFetchURIs = new ArrayList<String>();
@@ -438,7 +438,7 @@ public class Repartitioner {
tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
- Fragment frag = new Fragment(scan.getCanonicalName(), tablePath, 0, 0);
+ FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0);
Map<String, List<IntermediateEntry>> hashedByHost;
@@ -563,7 +563,7 @@ public class Repartitioner {
}
public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
- Fragment frag) {
+ FileFragment frag) {
LogicalNode plan = subQuery.getBlock().getPlan();
QueryUnit [] tasks = new QueryUnit[num];
for (int i = 0; i < num; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0f1cbde..61872ab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -53,7 +53,7 @@ import org.apache.tajo.master.TaskScheduler;
import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
import java.io.IOException;
import java.util.*;
@@ -642,14 +642,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
meta = desc.getMeta();
// TODO - should be change the inner directory
- List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
+ List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
inputPath);
QueryUnit queryUnit;
List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
int i = 0;
- for (Fragment fragment : fragments) {
+ for (FileFragment fragment : fragments) {
queryUnit = newQueryUnit(subQuery, i++, fragment);
queryUnits.add(queryUnit);
}
@@ -657,7 +657,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
}
- private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+ private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit unit = new QueryUnit(
QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
index 62bcf2d..dc77700 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -28,17 +28,17 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.logical.IndexScanNode;
import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.FileFragment;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map.Entry;
public class IndexUtil {
- public static String getIndexNameOfFrag(Fragment fragment, SortSpec[] keys) {
+ public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
StringBuilder builder = new StringBuilder();
builder.append(fragment.getPath().getName() + "_");
- builder.append(fragment.getStartOffset() + "_" + fragment.getLength() + "_");
+ builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
for(int i = 0 ; i < keys.length ; i ++) {
builder.append(keys[i].getSortKey().getColumnName()+"_");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 922719d..b5516f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -45,9 +45,9 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.ApplicationIdUtils;
import java.io.File;
@@ -58,6 +58,8 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
public class Task {
private static final Log LOG = LogFactory.getLog(Task.class);
@@ -141,8 +143,7 @@ public class Task {
taskId.getQueryUnitId().getId() + "_" + taskId.getId());
this.context = new TaskAttemptContext(systemConf, taskId,
- request.getFragments().toArray(new Fragment[request.getFragments().size()]),
- taskDir);
+ request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
@@ -159,7 +160,7 @@ public class Task {
StoreTableNode store = (StoreTableNode) plan;
this.partitionType = store.getPartitionType();
if (partitionType == PartitionType.RANGE_PARTITION) {
- SortNode sortNode = (SortNode) store.getChild();
+ SortNode sortNode = store.getChild();
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
}
@@ -181,11 +182,6 @@ public class Task {
+ (interQuery ? ", Use " + this.partitionType + " partitioning":""));
LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
- for (Fragment f: request.getFragments()) {
- TableDesc desc = descs.get(f.getName());
- LOG.info("Table Id:" + f.getName() + ", path:" + desc.getPath() + "(" + desc.getMeta().getStoreType() + "), " +
- "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")");
- }
LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
for (Fetch f : request.getFetches()) {
LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
@@ -237,25 +233,6 @@ public class Task {
public void localize(QueryUnitRequest request) throws IOException {
fetcherRunners = getFetchRunners(context, request.getFetches());
-
- List<Fragment> cached = Lists.newArrayList();
- for (Fragment frag : request.getFragments()) {
- if (frag.isDistCached()) {
- cached.add(frag);
- }
- }
-
- if (cached.size() > 0) {
- Path inFile;
-
- int i = fetcherRunners.size();
- for (Fragment cache : cached) {
- inFile = new Path(inputTableBaseDir, "in_" + i);
- taskRunnerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile);
- cache.setPath(inFile);
- i++;
- }
- }
}
public QueryUnitAttemptId getId() {
@@ -354,8 +331,8 @@ public class Task {
Collection<String> inputs = Lists.newArrayList(context.getInputTables());
for (String inputTable: inputs) {
File tableDir = new File(context.getFetchIn(), inputTable);
- Fragment [] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
- context.changeFragment(inputTable, frags);
+ FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+ context.updateAssignedFragments(inputTable, frags);
}
}
@@ -449,26 +426,26 @@ public class Task {
return false;
}
- private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
+ private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
throws IOException {
Configuration c = new Configuration(systemConf);
c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
FileSystem fs = FileSystem.get(c);
Path tablePath = new Path(file.getAbsolutePath());
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
FileStatus[] fileLists = fs.listStatus(tablePath);
for (FileStatus f : fileLists) {
if (f.getLen() == 0) {
continue;
}
- tablet = new Fragment(name, f.getPath(), 0l, f.getLen());
+ tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
listTablets.add(tablet);
}
- Fragment[] tablets = new Fragment[listTablets.size()];
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
return tablets;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index f05e1a1..86d8cf2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
@@ -29,13 +30,16 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
/**
* Contains the information about executing subquery.
@@ -43,7 +47,7 @@ import java.util.concurrent.CountDownLatch;
public class TaskAttemptContext {
private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
private final TajoConf conf;
- private final Map<String, List<Fragment>> fragmentMap = new HashMap<String, List<Fragment>>();
+ private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
private TaskAttemptState state;
private TableStats resultStats;
@@ -61,18 +65,18 @@ public class TaskAttemptContext {
private Enforcer enforcer;
public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
- final Fragment[] fragments,
+ final FragmentProto[] fragments,
final Path workDir) {
this.conf = conf;
this.queryId = queryId;
- for(Fragment t : fragments) {
- if (fragmentMap.containsKey(t.getName())) {
- fragmentMap.get(t.getName()).add(t);
+ for(FragmentProto t : fragments) {
+ if (fragmentMap.containsKey(t.getId())) {
+ fragmentMap.get(t.getId()).add(t);
} else {
- List<Fragment> frags = new ArrayList<Fragment>();
+ List<FragmentProto> frags = new ArrayList<FragmentProto>();
frags.add(t);
- fragmentMap.put(t.getName(), frags);
+ fragmentMap.put(t.getId(), frags);
}
}
@@ -82,6 +86,12 @@ public class TaskAttemptContext {
state = TaskAttemptState.TA_PENDING;
}
+ @VisibleForTesting
+ public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+ final Fragment [] fragments, final Path workDir) {
+ this(conf, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ }
+
public TajoConf getConf() {
return this.conf;
}
@@ -173,15 +183,15 @@ public class TaskAttemptContext {
return repartitions.entrySet().iterator();
}
- public void changeFragment(String tableId, Fragment [] fragments) {
+ public void updateAssignedFragments(String tableId, Fragment[] fragments) {
fragmentMap.remove(tableId);
for(Fragment t : fragments) {
- if (fragmentMap.containsKey(t.getName())) {
- fragmentMap.get(t.getName()).add(t);
+ if (fragmentMap.containsKey(t.getTableName())) {
+ fragmentMap.get(t.getTableName()).add(t.getProto());
} else {
- List<Fragment> frags = new ArrayList<Fragment>();
- frags.add(t);
- fragmentMap.put(t.getName(), frags);
+ List<FragmentProto> frags = new ArrayList<FragmentProto>();
+ frags.add(t.getProto());
+ fragmentMap.put(t.getTableName(), frags);
}
}
}
@@ -202,7 +212,7 @@ public class TaskAttemptContext {
this.progress = progress;
}
- public Fragment getTable(String id) {
+ public FragmentProto getTable(String id) {
return fragmentMap.get(id).get(0);
}
@@ -214,8 +224,8 @@ public class TaskAttemptContext {
return fragmentMap.keySet();
}
- public Fragment [] getTables(String id) {
- return fragmentMap.get(id).toArray(new Fragment[fragmentMap.get(id).size()]);
+ public FragmentProto [] getTables(String id) {
+ return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
}
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 4d2bd6c..5f0457e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -140,11 +141,11 @@ public class TestBNLJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
@@ -170,11 +171,11 @@ public class TestBNLJoinExec {
Expr context = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 615befa..c9499d6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -47,6 +49,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -156,10 +159,10 @@ public class TestBSTIndexExec {
this.rndKey = rnd.nextInt(250);
final String QUERY = "select * from employee where managerId = " + rndKey;
- Fragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
+ FileFragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
- LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
Expr expr = analyzer.parse(QUERY);
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
@@ -192,11 +195,12 @@ public class TestBSTIndexExec {
Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
"Error: There is no table matched to %s", scanNode.getTableName());
- Fragment[] fragments = ctx.getTables(scanNode.getTableName());
+ List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(),
+ ctx.getTables(scanNode.getTableName()));
Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
- return new BSTIndexScanExec(ctx, sm, scanNode, fragments[0], idxPath,
+ return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath,
idxSchema, comp , datum);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 282f5be..e236126 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -109,11 +110,11 @@ public class TestExternalSortExec {
@Test
public final void testNext() throws IOException, PlanningException {
- Fragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+ FileFragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
TaskAttemptContext ctx = new TaskAttemptContext(conf,
- LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
LogicalPlan plan = planner.createPlan(expr);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index c7fec48..63f14b7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -253,9 +254,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -290,9 +291,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -326,9 +327,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -363,10 +364,10 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 61802fd..c92d1c9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -296,9 +297,9 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -331,9 +332,9 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -366,9 +367,9 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -402,9 +403,9 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -440,10 +441,10 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -477,10 +478,10 @@ public class TestFullOuterMergeJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
+ FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index e521a94..57f2376 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -140,12 +141,12 @@ public class TestHashAntiJoinExec {
@Test
public final void testHashAntiJoin() throws IOException, PlanningException {
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index ba02c21..2a1af7c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -140,9 +141,9 @@ public class TestHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index e4dcb95..d03f3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -144,12 +145,12 @@ public class TestHashSemiJoinExec {
@Test
public final void testHashSemiJoin() throws IOException, PlanningException {
- Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
- Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 58f9b3b..4ac423b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
@@ -254,9 +255,9 @@ public class TestLeftOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -282,12 +283,12 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -322,12 +323,12 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -363,12 +364,12 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -404,12 +405,12 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
- Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(),
Integer.MAX_VALUE);
- Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(),
Integer.MAX_VALUE);
- Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+ FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
TaskAttemptContext ctx = new TaskAttemptContext(conf,