You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/19 13:48:52 UTC
[04/13] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to
Task.
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
index 2b3887f..25a2fbc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.InterDataRetriever;
@@ -80,8 +80,8 @@ public class TestHttpDataServer {
public final void testInterDataRetriver() throws Exception {
MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
ExecutionBlockId schid = plan.newExecutionBlockId();
- QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
- QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
+ TaskId qid1 = QueryIdFactory.newTaskId(schid);
+ TaskId qid2 = QueryIdFactory.newTaskId(schid);
File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
qid1Dir.mkdirs();
@@ -121,8 +121,8 @@ public class TestHttpDataServer {
public final void testNoSuchFile() throws Exception {
MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
ExecutionBlockId schid = plan.newExecutionBlockId();
- QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
- QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
+ TaskId qid1 = QueryIdFactory.newTaskId(schid);
+ TaskId qid2 = QueryIdFactory.newTaskId(schid);
File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
qid1Dir.mkdirs();
@@ -154,7 +154,7 @@ public class TestHttpDataServer {
assertDataRetrival(qid1, addr.getPort(), watermark1);
}
- private static void assertDataRetrival(QueryUnitId id, int port,
+ private static void assertDataRetrival(TaskId id, int port,
String watermark) throws IOException {
URL url = new URL("http://127.0.0.1:"+port
+ "/?qid=" + id.toString() + "&fn=testHttp");
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
deleted file mode 100644
index f6d0eb3..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
+++ /dev/null
@@ -1 +0,0 @@
-select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
deleted file mode 100644
index c3e09f1..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
+++ /dev/null
@@ -1,5 +0,0 @@
-select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum
-(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as
- count_order
-from lineitem
-group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
deleted file mode 100644
index a0f9c78..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
+++ /dev/null
@@ -1,10 +0,0 @@
-select *
-from (
- select a.col1, a.col2, a.key
- from ColumnPartitionedTable a
- join ColumnPartitionedTable b on a.key = b.key
- where
- (a.key = 45.0 or a.key = 38.0)
-) test
-order by
- col1, col2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
new file mode 100644
index 0000000..f6d0eb3
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
@@ -0,0 +1 @@
+select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
new file mode 100644
index 0000000..c3e09f1
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
@@ -0,0 +1,5 @@
+select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum
+(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as
+ count_order
+from lineitem
+group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
new file mode 100644
index 0000000..a0f9c78
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
@@ -0,0 +1,10 @@
+select *
+from (
+ select a.col1, a.col2, a.key
+ from ColumnPartitionedTable a
+ join ColumnPartitionedTable b on a.key = b.key
+ where
+ (a.key = 45.0 or a.key = 38.0)
+) test
+order by
+ col1, col2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index c69c7d9..c408b16 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -225,8 +225,8 @@
<argument>Tajo</argument>
<argument>org.apache.tajo.master.querymaster.Query,
org.apache.tajo.master.querymaster.SubQuery,
- org.apache.tajo.master.querymaster.QueryUnit,
- org.apache.tajo.master.querymaster.QueryUnitAttempt
+ org.apache.tajo.master.querymaster.Task,
+ org.apache.tajo.master.querymaster.TaskAttempt
</argument>
<argument>Tajo.gv</argument>
</arguments>
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index 31db15c..bfb70b4 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -232,7 +232,7 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
private List<String> splitMaps(List<String> qids) {
if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
+ LOG.error("QueryId is EMPTY");
return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
index 67e7423..5591bba 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
import org.apache.tajo.pullserver.FileAccessForbiddenException;
import org.apache.tajo.util.TajoIdUtils;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -75,13 +75,13 @@ public class AdvancedDataRetriever implements DataRetriever {
if (params.containsKey("sid")) {
List<FileChunk> chunks = Lists.newArrayList();
- List<String> queryUnidIds = splitMaps(params.get("qid"));
- for (String eachQueryUnitId : queryUnidIds) {
- String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
+ List<String> taskIds = splitMaps(params.get("qid"));
+ for (String eachTaskId : taskIds) {
+ String[] taskIdSeqTokens = eachTaskId.split("_");
ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
- QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
+ TaskId quid = new TaskId(ebId, Integer.parseInt(taskIdSeqTokens[0]));
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
+ TaskAttemptId attemptId = new TaskAttemptId(quid, Integer.parseInt(taskIdSeqTokens[1]));
RetrieverHandler handler = handlerMap.get(attemptId.toString());
FileChunk chunk = handler.get(params);
@@ -113,7 +113,7 @@ public class AdvancedDataRetriever implements DataRetriever {
private List<String> splitMaps(List<String> qids) {
if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
+ LOG.error("QueryId is EMPTY");
return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index e2d89d6..07a51ba 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -68,7 +68,7 @@ public abstract class StorageManager {
private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
Configuration.class,
- QueryUnitAttemptId.class,
+ TaskAttemptId.class,
Schema.class,
TableMeta.class,
Path.class
@@ -446,7 +446,7 @@ public abstract class StorageManager {
* @throws java.io.IOException
*/
public Appender getAppender(OverridableConf queryContext,
- QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+ TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
throws IOException {
Appender appender;
@@ -511,7 +511,7 @@ public abstract class StorageManager {
* @param <T>
* @return The scanner instance
*/
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
TableMeta meta, Schema schema, Path workDir) {
T result;
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 2b196c9..af3d623 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -158,8 +158,8 @@ public class StorageUtil extends StorageConstants {
Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
String pathName = lastFile.getName();
- // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
- // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
+ // 0.8: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq>
+ // 0.9: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq>-<Sequence>
String[] pathTokens = pathName.split("-");
if (pathTokens.length == 3) {
return -1;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 8615235..4836352 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -45,7 +45,7 @@ public abstract class AbstractHBaseAppender implements Appender {
protected Configuration conf;
protected Schema schema;
protected TableMeta meta;
- protected QueryUnitAttemptId taskAttemptId;
+ protected TaskAttemptId taskAttemptId;
protected Path stagingDir;
protected boolean inited = false;
@@ -72,7 +72,7 @@ public abstract class AbstractHBaseAppender implements Appender {
protected KeyValue[] keyValues;
- public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public AbstractHBaseAppender(Configuration conf, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path stagingDir) {
this.conf = conf;
this.schema = schema;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 50f61a8..45a1bff 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -38,7 +38,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
private HTableInterface htable;
private long totalNumBytes;
- public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path stagingDir) {
super(conf, taskAttemptId, schema, meta, stagingDir);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index a6e7a81..de4b4cb 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -539,7 +539,7 @@ public class HBaseStorageManager extends StorageManager {
@Override
public Appender getAppender(OverridableConf queryContext,
- QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+ TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
throws IOException {
if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
index 07f7988..36678e4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -53,7 +53,7 @@ public class HFileAppender extends AbstractHBaseAppender {
private Path workingFilePath;
private FileOutputCommitter committer;
- public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public HFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path stagingDir) {
super(conf, taskAttemptId, schema, meta, stagingDir);
}
@@ -66,10 +66,10 @@ public class HFileAppender extends AbstractHBaseAppender {
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
- ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
+ ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
writerContext = new TaskAttemptContextImpl(taskConf,
new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
- taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
+ taskAttemptId.getTaskId().getId(), taskAttemptId.getId()));
HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 4bf4c99..28c263c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -76,7 +76,7 @@ public class CSVFile {
private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
private SerializerDeserializer serde;
- public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ public CSVAppender(Configuration conf, final TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
this.fs = workDir.getFileSystem(conf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 47f67c6..b208a71 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
@@ -38,12 +38,12 @@ public abstract class FileAppender implements Appender {
protected final TableMeta meta;
protected final Schema schema;
protected final Path workDir;
- protected final QueryUnitAttemptId taskAttemptId;
+ protected final TaskAttemptId taskAttemptId;
protected boolean enabledStats;
protected Path path;
- public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
+ public FileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema,
TableMeta meta, Path workDir) {
this.conf = conf;
this.meta = meta;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index 060bf16..3d4f7d5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.tajo.OverridableConf;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -268,7 +268,7 @@ public class FileStorageManager extends StorageManager {
/////////////////////////////////////////////////////////////////////////////
// FileInputFormat Area
/////////////////////////////////////////////////////////////////////////////
- public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) {
+ public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
if (taskAttemptId == null) {
// For testcase
return workDir;
@@ -277,8 +277,8 @@ public class FileStorageManager extends StorageManager {
// where ss is the subquery id associated with this task, and nnnnnn is the task id.
Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
OUTPUT_FILE_PREFIX +
- OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
- OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" +
+ OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" +
+ OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
OUTPUT_FILE_FORMAT_SEQ.get().format(0));
LOG.info("Output File Path: " + outFilePath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index 33b2750..4c772c9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.util.Pair;
@@ -42,7 +42,7 @@ public class HashShuffleAppender implements Appender {
private TableStats tableStats;
//<taskId,<page start offset,<task start, task end>>>
- private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+ private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
//page start offset, length
private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
@@ -69,7 +69,7 @@ public class HashShuffleAppender implements Appender {
@Override
public void init() throws IOException {
currentPage = new Pair(0L, 0);
- taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
+ taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
rowNumInPage = 0;
}
@@ -81,7 +81,7 @@ public class HashShuffleAppender implements Appender {
* @return written bytes
* @throws java.io.IOException
*/
- public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
+ public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException {
synchronized(appender) {
if (closed.get()) {
return 0;
@@ -189,7 +189,7 @@ public class HashShuffleAppender implements Appender {
return pages;
}
- public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
+ public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
return taskTupleIndexes;
}
@@ -203,7 +203,7 @@ public class HashShuffleAppender implements Appender {
return merged;
}
- public void taskFinished(QueryUnitAttemptId taskId) {
+ public void taskFinished(TaskAttemptId taskId) {
taskTupleIndexes.remove(taskId);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 636ae0f..74190bc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
@@ -154,10 +154,10 @@ public class HashShuffleAppenderManager {
return intermEntries;
}
- public void finalizeTask(QueryUnitAttemptId taskId) {
+ public void finalizeTask(TaskAttemptId taskId) {
synchronized (appenderMap) {
Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
- appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
+ appenderMap.get(taskId.getTaskId().getExecutionBlockId());
if (partitionAppenderMap == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index cb86f35..45e07d3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -465,7 +465,7 @@ public class RawFile {
private TableStatistics stats;
- public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 8da6ada..5510cbf 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -314,7 +314,7 @@ public class RowFile {
// statistics
private TableStatistics stats;
- public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path workDir)
throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index dbb8bd0..69399dc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -61,7 +61,7 @@ public class AvroAppender extends FileAppender {
* @param workDir The path of the Parquet file to write to.
*/
public AvroAppender(Configuration conf,
- QueryUnitAttemptId taskAttemptId,
+ TaskAttemptId taskAttemptId,
org.apache.tajo.catalog.Schema schema,
TableMeta meta, Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index b10d423..ef5203c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,7 +18,7 @@
package org.apache.tajo.storage.parquet;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.storage.StorageConstants;
import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.metadata.CompressionCodecName;
@@ -54,7 +54,7 @@ public class ParquetAppender extends FileAppender {
* @param meta The table metadata.
* @param workDir The path of the Parquet file to write to.
*/
- public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta,
+ public ParquetAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta,
Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
this.blockSize = Integer.parseInt(
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index d88223b..2c09100 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -710,7 +710,7 @@ public class RCFile {
return out.getPos();
}
- public RCFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ public RCFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 14e0f26..8b5d677 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -74,7 +74,7 @@ public class SequenceFileAppender extends FileAppender {
private Writable EMPTY_KEY;
- public SequenceFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public SequenceFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
this.meta = meta;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 8824e3e..59129d1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.util.ReflectionUtil;
@@ -114,7 +113,7 @@ public class DelimitedTextFile {
private NonSyncByteArrayOutputStream os;
private TextLineSerializer serializer;
- public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ public DelimitedTextFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path path)
throws IOException {
super(conf, taskAttemptId, schema, meta, path);