You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/19 13:48:52 UTC

[04/13] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to Task.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
index 2b3887f..25a2fbc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.InterDataRetriever;
@@ -80,8 +80,8 @@ public class TestHttpDataServer {
   public final void testInterDataRetriver() throws Exception {
     MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
     ExecutionBlockId schid = plan.newExecutionBlockId();
-    QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
-    QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
+    TaskId qid1 = QueryIdFactory.newTaskId(schid);
+    TaskId qid2 = QueryIdFactory.newTaskId(schid);
     
     File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
     qid1Dir.mkdirs();
@@ -121,8 +121,8 @@ public class TestHttpDataServer {
   public final void testNoSuchFile() throws Exception {
     MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
     ExecutionBlockId schid = plan.newExecutionBlockId();
-    QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
-    QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
+    TaskId qid1 = QueryIdFactory.newTaskId(schid);
+    TaskId qid2 = QueryIdFactory.newTaskId(schid);
     
     File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
     qid1Dir.mkdirs();
@@ -154,7 +154,7 @@ public class TestHttpDataServer {
     assertDataRetrival(qid1, addr.getPort(), watermark1);
   }
   
-  private static void assertDataRetrival(QueryUnitId id, int port, 
+  private static void assertDataRetrival(TaskId id, int port,
       String watermark) throws IOException {
     URL url = new URL("http://127.0.0.1:"+port
         + "/?qid=" + id.toString() + "&fn=testHttp");

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
deleted file mode 100644
index f6d0eb3..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
+++ /dev/null
@@ -1 +0,0 @@
-select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
deleted file mode 100644
index c3e09f1..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
+++ /dev/null
@@ -1,5 +0,0 @@
-select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum
-(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as
- count_order
-from lineitem
-group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
deleted file mode 100644
index a0f9c78..0000000
--- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
+++ /dev/null
@@ -1,10 +0,0 @@
-select *
-from (
-  select a.col1, a.col2, a.key
-  from ColumnPartitionedTable a
-  join ColumnPartitionedTable b on a.key = b.key
-  where
-    (a.key = 45.0 or a.key = 38.0)
-) test
-order by
-  col1, col2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
new file mode 100644
index 0000000..f6d0eb3
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql
@@ -0,0 +1 @@
+select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
new file mode 100644
index 0000000..c3e09f1
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql
@@ -0,0 +1,5 @@
+select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum
+(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as
+ count_order
+from lineitem
+group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
new file mode 100644
index 0000000..a0f9c78
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql
@@ -0,0 +1,10 @@
+select *
+from (
+  select a.col1, a.col2, a.key
+  from ColumnPartitionedTable a
+  join ColumnPartitionedTable b on a.key = b.key
+  where
+    (a.key = 45.0 or a.key = 38.0)
+) test
+order by
+  col1, col2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index c69c7d9..c408b16 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -225,8 +225,8 @@
                     <argument>Tajo</argument>
                     <argument>org.apache.tajo.master.querymaster.Query,
                       org.apache.tajo.master.querymaster.SubQuery,
-                      org.apache.tajo.master.querymaster.QueryUnit,
-                      org.apache.tajo.master.querymaster.QueryUnitAttempt
+                      org.apache.tajo.master.querymaster.Task,
+                      org.apache.tajo.master.querymaster.TaskAttempt
                     </argument>
                     <argument>Tajo.gv</argument>
                   </arguments>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index 31db15c..bfb70b4 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -232,7 +232,7 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
 
   private List<String> splitMaps(List<String> qids) {
     if (null == qids) {
-      LOG.error("QueryUnitId is EMPTY");
+      LOG.error("QueryId is EMPTY");
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
index 67e7423..5591bba 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
 import org.apache.tajo.pullserver.FileAccessForbiddenException;
 import org.apache.tajo.util.TajoIdUtils;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -75,13 +75,13 @@ public class AdvancedDataRetriever implements DataRetriever {
 
     if (params.containsKey("sid")) {
       List<FileChunk> chunks = Lists.newArrayList();
-      List<String> queryUnidIds = splitMaps(params.get("qid"));
-      for (String eachQueryUnitId : queryUnidIds) {
-        String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
+      List<String> taskIds = splitMaps(params.get("qid"));
+      for (String eachTaskId : taskIds) {
+        String[] taskIdSeqTokens = eachTaskId.split("_");
         ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
-        QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
+        TaskId quid = new TaskId(ebId, Integer.parseInt(taskIdSeqTokens[0]));
 
-        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
+        TaskAttemptId attemptId = new TaskAttemptId(quid, Integer.parseInt(taskIdSeqTokens[1]));
 
         RetrieverHandler handler = handlerMap.get(attemptId.toString());
         FileChunk chunk = handler.get(params);
@@ -113,7 +113,7 @@ public class AdvancedDataRetriever implements DataRetriever {
 
   private List<String> splitMaps(List<String> qids) {
     if (null == qids) {
-      LOG.error("QueryUnitId is EMPTY");
+      LOG.error("QueryId is EMPTY");
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index e2d89d6..07a51ba 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -68,7 +68,7 @@ public abstract class StorageManager {
 
   private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
       Configuration.class,
-      QueryUnitAttemptId.class,
+      TaskAttemptId.class,
       Schema.class,
       TableMeta.class,
       Path.class
@@ -446,7 +446,7 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public Appender getAppender(OverridableConf queryContext,
-                              QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+                              TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
       throws IOException {
     Appender appender;
 
@@ -511,7 +511,7 @@ public abstract class StorageManager {
    * @param <T>
    * @return The scanner instance
    */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId,
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
                                           TableMeta meta, Schema schema, Path workDir) {
     T result;
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 2b196c9..af3d623 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -158,8 +158,8 @@ public class StorageUtil extends StorageConstants {
     Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
     String pathName = lastFile.getName();
 
-    // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
-    // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
+    // 0.8: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq>
+    // 0.9: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq>-<Sequence>
     String[] pathTokens = pathName.split("-");
     if (pathTokens.length == 3) {
       return -1;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 8615235..4836352 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage.hbase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -45,7 +45,7 @@ public abstract class AbstractHBaseAppender implements Appender {
   protected Configuration conf;
   protected Schema schema;
   protected TableMeta meta;
-  protected QueryUnitAttemptId taskAttemptId;
+  protected TaskAttemptId taskAttemptId;
   protected Path stagingDir;
   protected boolean inited = false;
 
@@ -72,7 +72,7 @@ public abstract class AbstractHBaseAppender implements Appender {
 
   protected KeyValue[] keyValues;
 
-  public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+  public AbstractHBaseAppender(Configuration conf, TaskAttemptId taskAttemptId,
                        Schema schema, TableMeta meta, Path stagingDir) {
     this.conf = conf;
     this.schema = schema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 50f61a8..45a1bff 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -38,7 +38,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
   private HTableInterface htable;
   private long totalNumBytes;
 
-  public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+  public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId,
                           Schema schema, TableMeta meta, Path stagingDir) {
     super(conf, taskAttemptId, schema, meta, stagingDir);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index a6e7a81..de4b4cb 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -539,7 +539,7 @@ public class HBaseStorageManager extends StorageManager {
 
   @Override
   public Appender getAppender(OverridableConf queryContext,
-                              QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+                              TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
       throws IOException {
     if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
       return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
index 07f7988..36678e4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -53,7 +53,7 @@ public class HFileAppender extends AbstractHBaseAppender {
   private Path workingFilePath;
   private FileOutputCommitter committer;
 
-  public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+  public HFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
                        Schema schema, TableMeta meta, Path stagingDir) {
     super(conf, taskAttemptId, schema, meta, stagingDir);
   }
@@ -66,10 +66,10 @@ public class HFileAppender extends AbstractHBaseAppender {
     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
     taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
 
-    ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
+    ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
     writerContext = new TaskAttemptContextImpl(taskConf,
         new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
-            taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
+            taskAttemptId.getTaskId().getId(), taskAttemptId.getId()));
 
     HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 4bf4c99..28c263c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -76,7 +76,7 @@ public class CSVFile {
     private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
     private SerializerDeserializer serde;
 
-    public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+    public CSVAppender(Configuration conf, final TaskAttemptId taskAttemptId,
                        final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
       super(conf, taskAttemptId, schema, meta, workDir);
       this.fs = workDir.getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 47f67c6..b208a71 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
@@ -38,12 +38,12 @@ public abstract class FileAppender implements Appender {
   protected final TableMeta meta;
   protected final Schema schema;
   protected final Path workDir;
-  protected final QueryUnitAttemptId taskAttemptId;
+  protected final TaskAttemptId taskAttemptId;
 
   protected boolean enabledStats;
   protected Path path;
 
-  public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
+  public FileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema,
                       TableMeta meta, Path workDir) {
     this.conf = conf;
     this.meta = meta;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index 060bf16..3d4f7d5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.tajo.OverridableConf;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -268,7 +268,7 @@ public class FileStorageManager extends StorageManager {
   /////////////////////////////////////////////////////////////////////////////
   // FileInputFormat Area
   /////////////////////////////////////////////////////////////////////////////
-  public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) {
+  public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
     if (taskAttemptId == null) {
       // For testcase
       return workDir;
@@ -277,8 +277,8 @@ public class FileStorageManager extends StorageManager {
     // where ss is the subquery id associated with this task, and nnnnnn is the task id.
     Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
         OUTPUT_FILE_PREFIX +
-            OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
-            OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" +
+            OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" +
+            OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
             OUTPUT_FILE_FORMAT_SEQ.get().format(0));
     LOG.info("Output File Path: " + outFilePath);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index 33b2750..4c772c9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -21,7 +21,7 @@ package org.apache.tajo.storage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.util.Pair;
 
@@ -42,7 +42,7 @@ public class HashShuffleAppender implements Appender {
   private TableStats tableStats;
 
   //<taskId,<page start offset,<task start, task end>>>
-  private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+  private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
 
   //page start offset, length
   private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
@@ -69,7 +69,7 @@ public class HashShuffleAppender implements Appender {
   @Override
   public void init() throws IOException {
     currentPage = new Pair(0L, 0);
-    taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
+    taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
     rowNumInPage = 0;
   }
 
@@ -81,7 +81,7 @@ public class HashShuffleAppender implements Appender {
    * @return written bytes
    * @throws java.io.IOException
    */
-  public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
+  public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException {
     synchronized(appender) {
       if (closed.get()) {
         return 0;
@@ -189,7 +189,7 @@ public class HashShuffleAppender implements Appender {
     return pages;
   }
 
-  public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
+  public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
     return taskTupleIndexes;
   }
 
@@ -203,7 +203,7 @@ public class HashShuffleAppender implements Appender {
     return merged;
   }
 
-  public void taskFinished(QueryUnitAttemptId taskId) {
+  public void taskFinished(TaskAttemptId taskId) {
     taskTupleIndexes.remove(taskId);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 636ae0f..74190bc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
@@ -154,10 +154,10 @@ public class HashShuffleAppenderManager {
     return intermEntries;
   }
 
-  public void finalizeTask(QueryUnitAttemptId taskId) {
+  public void finalizeTask(TaskAttemptId taskId) {
     synchronized (appenderMap) {
       Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
-        appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
+        appenderMap.get(taskId.getTaskId().getExecutionBlockId());
       if (partitionAppenderMap == null) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index cb86f35..45e07d3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -465,7 +465,7 @@ public class RawFile {
 
     private TableStatistics stats;
 
-    public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+    public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
                            Schema schema, TableMeta meta, Path workDir) throws IOException {
       super(conf, taskAttemptId, schema, meta, workDir);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 8da6ada..5510cbf 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -314,7 +314,7 @@ public class RowFile {
     // statistics
     private TableStatistics stats;
 
-    public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+    public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
                            final Schema schema, final TableMeta meta, final Path workDir)
         throws IOException {
       super(conf, taskAttemptId, schema, meta, workDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index dbb8bd0..69399dc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -61,7 +61,7 @@ public class AvroAppender extends FileAppender {
    * @param workDir The path of the Parquet file to write to.
    */
   public AvroAppender(Configuration conf,
-                      QueryUnitAttemptId taskAttemptId,
+                      TaskAttemptId taskAttemptId,
                       org.apache.tajo.catalog.Schema schema,
                       TableMeta meta, Path workDir) throws IOException {
     super(conf, taskAttemptId, schema, meta, workDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index b10d423..ef5203c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.storage.parquet;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.storage.StorageConstants;
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.metadata.CompressionCodecName;
@@ -54,7 +54,7 @@ public class ParquetAppender extends FileAppender {
    * @param meta The table metadata.
    * @param workDir The path of the Parquet file to write to.
    */
-  public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta,
+  public ParquetAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta,
                          Path workDir) throws IOException {
     super(conf, taskAttemptId, schema, meta, workDir);
     this.blockSize = Integer.parseInt(

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index d88223b..2c09100 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -710,7 +710,7 @@ public class RCFile {
       return out.getPos();
     }
 
-    public RCFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+    public RCFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
                           final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
       super(conf, taskAttemptId, schema, meta, workDir);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 14e0f26..8b5d677 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -74,7 +74,7 @@ public class SequenceFileAppender extends FileAppender {
 
   private Writable EMPTY_KEY;
 
-  public SequenceFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+  public SequenceFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
                               Schema schema, TableMeta meta, Path workDir) throws IOException {
     super(conf, taskAttemptId, schema, meta, workDir);
     this.meta = meta;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 8824e3e..59129d1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
 import org.apache.tajo.util.ReflectionUtil;
@@ -114,7 +113,7 @@ public class DelimitedTextFile {
     private NonSyncByteArrayOutputStream os;
     private TextLineSerializer serializer;
 
-    public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+    public DelimitedTextFileAppender(Configuration conf, TaskAttemptId taskAttemptId,
                                      final Schema schema, final TableMeta meta, final Path path)
         throws IOException {
       super(conf, taskAttemptId, schema, meta, path);