You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/16 00:42:35 UTC

[13/13] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f674fa8f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f674fa8f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f674fa8f

Branch: refs/heads/index_support
Commit: f674fa8f08622ff752ff56bc6b11a93a520e266c
Parents: 8ec099c dc49049
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Jun 16 07:42:08 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Jun 16 07:42:08 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |    9 +
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |  306 ++--
 .../org/apache/tajo/storage/RowStoreUtil.java   |   93 +-
 .../java/org/apache/tajo/datum/DateDatum.java   |   34 +-
 .../main/java/org/apache/tajo/datum/Datum.java  |    7 +
 .../org/apache/tajo/datum/DatumFactory.java     |    4 +-
 .../java/org/apache/tajo/datum/Float4Datum.java |    6 +-
 .../java/org/apache/tajo/datum/Float8Datum.java |    6 +-
 .../java/org/apache/tajo/datum/Int2Datum.java   |    6 +-
 .../java/org/apache/tajo/datum/Int4Datum.java   |    6 +-
 .../java/org/apache/tajo/datum/Int8Datum.java   |    6 +-
 .../org/apache/tajo/datum/IntervalDatum.java    |    6 +-
 .../java/org/apache/tajo/datum/TimeDatum.java   |   22 +-
 .../org/apache/tajo/datum/TimestampDatum.java   |   45 +-
 .../org/apache/tajo/storage/EmptyTuple.java     |  141 +-
 .../java/org/apache/tajo/storage/NullTuple.java |   43 +-
 .../java/org/apache/tajo/storage/Tuple.java     |   79 +-
 .../java/org/apache/tajo/storage/VTuple.java    |  218 +--
 .../apache/tajo/util/datetime/DateTimeUtil.java |   12 +-
 .../apache/tajo/datum/TestTimestampDatum.java   |    2 +-
 .../apache/tajo/util/TestDateTimeFormat.java    |    2 +-
 .../tajo/engine/function/builtin/AvgDouble.java |   10 +-
 .../tajo/engine/function/builtin/AvgFloat.java  |    5 +-
 .../tajo/engine/function/builtin/AvgInt.java    |    5 +-
 .../tajo/engine/function/builtin/AvgLong.java   |   10 +-
 .../tajo/engine/function/builtin/Coalesce.java  |    5 +-
 .../tajo/engine/function/builtin/CountRows.java |    2 +-
 .../engine/function/builtin/CountValue.java     |    2 +-
 .../function/builtin/CountValueDistinct.java    |   13 +-
 .../tajo/engine/function/builtin/Date.java      |    2 +-
 .../tajo/engine/function/builtin/LastValue.java |    5 +-
 .../tajo/engine/function/builtin/Lead.java      |    6 +-
 .../tajo/engine/function/builtin/Max.java       |   10 +-
 .../tajo/engine/function/builtin/Min.java       |    4 +-
 .../tajo/engine/function/builtin/RandomInt.java |    2 +-
 .../tajo/engine/function/builtin/SumDouble.java |    5 +-
 .../function/builtin/SumDoubleDistinct.java     |   10 +-
 .../tajo/engine/function/builtin/SumFloat.java  |    5 +-
 .../function/builtin/SumFloatDistinct.java      |   10 +-
 .../engine/function/builtin/SumIntDistinct.java |   10 +-
 .../tajo/engine/function/builtin/SumLong.java   |    5 +-
 .../function/builtin/SumLongDistinct.java       |   10 +-
 .../tajo/engine/function/builtin/Variance.java  |   13 +-
 .../tajo/engine/function/datetime/AddDays.java  |    4 +-
 .../engine/function/datetime/AddMonths.java     |    6 +-
 .../function/datetime/DatePartFromDate.java     |   67 +-
 .../function/datetime/DatePartFromTime.java     |   16 +-
 .../datetime/DatePartFromTimestamp.java         |   14 +-
 .../datetime/DateTimePartFromUnixTimestamp.java |   26 +-
 .../function/datetime/ToCharTimestamp.java      |   10 +-
 .../tajo/engine/function/datetime/ToDate.java   |    6 +-
 .../function/datetime/ToTimestampInt.java       |    5 +-
 .../function/datetime/ToTimestampText.java      |    7 +-
 .../function/geoip/GeoIPCountryInet4.java       |    5 +-
 .../engine/function/geoip/GeoIPCountryText.java |    5 +-
 .../function/geoip/GeoIPInCountryInet4.java     |    6 +-
 .../function/geoip/GeoIPInCountryText.java      |    6 +-
 .../function/json/JsonExtractPathText.java      |    9 +-
 .../tajo/engine/function/math/AbsDouble.java    |    5 +-
 .../tajo/engine/function/math/AbsFloat.java     |    5 +-
 .../tajo/engine/function/math/AbsInt.java       |    5 +-
 .../tajo/engine/function/math/AbsLong.java      |    5 +-
 .../apache/tajo/engine/function/math/Acos.java  |    5 +-
 .../apache/tajo/engine/function/math/Asin.java  |    5 +-
 .../apache/tajo/engine/function/math/Atan.java  |    5 +-
 .../apache/tajo/engine/function/math/Atan2.java |    6 +-
 .../apache/tajo/engine/function/math/Cbrt.java  |    5 +-
 .../apache/tajo/engine/function/math/Ceil.java  |    5 +-
 .../apache/tajo/engine/function/math/Cos.java   |    5 +-
 .../tajo/engine/function/math/Degrees.java      |    5 +-
 .../apache/tajo/engine/function/math/Div.java   |   12 +-
 .../apache/tajo/engine/function/math/Exp.java   |    5 +-
 .../apache/tajo/engine/function/math/Floor.java |    5 +-
 .../apache/tajo/engine/function/math/Mod.java   |   12 +-
 .../apache/tajo/engine/function/math/Pow.java   |    6 +-
 .../tajo/engine/function/math/Radians.java      |    5 +-
 .../apache/tajo/engine/function/math/Round.java |    5 +-
 .../tajo/engine/function/math/RoundFloat8.java  |    9 +-
 .../apache/tajo/engine/function/math/Sign.java  |    5 +-
 .../apache/tajo/engine/function/math/Sin.java   |    5 +-
 .../apache/tajo/engine/function/math/Sqrt.java  |    5 +-
 .../apache/tajo/engine/function/math/Tan.java   |    5 +-
 .../tajo/engine/function/string/Ascii.java      |    7 +-
 .../tajo/engine/function/string/BTrim.java      |   10 +-
 .../tajo/engine/function/string/BitLength.java  |    8 +-
 .../tajo/engine/function/string/CharLength.java |    7 +-
 .../apache/tajo/engine/function/string/Chr.java |    7 +-
 .../tajo/engine/function/string/Concat.java     |    9 +-
 .../tajo/engine/function/string/Concat_ws.java  |   20 +-
 .../tajo/engine/function/string/Decode.java     |   17 +-
 .../tajo/engine/function/string/Digest.java     |    8 +-
 .../tajo/engine/function/string/Encode.java     |   18 +-
 .../tajo/engine/function/string/FindInSet.java  |   21 +-
 .../tajo/engine/function/string/InitCap.java    |    7 +-
 .../tajo/engine/function/string/LTrim.java      |   10 +-
 .../tajo/engine/function/string/Left.java       |   14 +-
 .../tajo/engine/function/string/Length.java     |    5 +-
 .../tajo/engine/function/string/Locate.java     |   15 +-
 .../tajo/engine/function/string/Lower.java      |    7 +-
 .../tajo/engine/function/string/Lpad.java       |   23 +-
 .../apache/tajo/engine/function/string/Md5.java |   11 +-
 .../engine/function/string/OctetLength.java     |    5 +-
 .../tajo/engine/function/string/QuoteIdent.java |    6 +-
 .../tajo/engine/function/string/RTrim.java      |   10 +-
 .../engine/function/string/RegexpReplace.java   |   51 +-
 .../tajo/engine/function/string/Repeat.java     |   10 +-
 .../tajo/engine/function/string/Reverse.java    |    7 +-
 .../tajo/engine/function/string/Right.java      |   12 +-
 .../tajo/engine/function/string/Rpad.java       |   31 +-
 .../tajo/engine/function/string/SplitPart.java  |    9 +-
 .../tajo/engine/function/string/StrPos.java     |   15 +-
 .../tajo/engine/function/string/StrPosb.java    |   14 +-
 .../tajo/engine/function/string/Substr.java     |   37 +-
 .../tajo/engine/function/string/ToBin.java      |    5 +-
 .../tajo/engine/function/string/ToCharLong.java |    4 +-
 .../tajo/engine/function/string/ToHex.java      |    5 +-
 .../tajo/engine/function/string/Upper.java      |    7 +-
 .../tajo/engine/function/window/FirstValue.java |    6 +-
 .../apache/tajo/engine/function/window/Lag.java |    8 +-
 .../tajo/engine/function/window/Rank.java       |    2 +-
 .../engine/planner/RangePartitionAlgorithm.java |   66 +-
 .../engine/planner/UniformRangePartition.java   |  208 ++-
 .../planner/physical/BSTIndexScanExec.java      |    4 +
 .../planner/physical/CommonHashJoinExec.java    |    4 +-
 .../planner/physical/ComparableVector.java      |   10 +-
 .../DistinctGroupbyFirstAggregationExec.java    |   14 +-
 .../DistinctGroupbyHashAggregationExec.java     |   10 +-
 .../DistinctGroupbySecondAggregationExec.java   |    8 +-
 .../DistinctGroupbySortAggregationExec.java     |    2 +-
 .../DistinctGroupbyThirdAggregationExec.java    |   10 +-
 .../planner/physical/HashAggregateExec.java     |    4 +-
 .../HashBasedColPartitionStoreExec.java         |    4 +-
 .../engine/planner/physical/HashJoinExec.java   |    6 +-
 .../planner/physical/HashLeftOuterJoinExec.java |    3 +-
 .../planner/physical/HashPartitioner.java       |    2 +-
 .../physical/HashShuffleFileWriteExec.java      |    2 +-
 .../planner/physical/JoinTupleComparator.java   |    4 +-
 .../engine/planner/physical/PhysicalExec.java   |    2 +-
 .../physical/RangeShuffleFileWriteExec.java     |   21 +-
 .../planner/physical/SortAggregateExec.java     |    9 +-
 .../SortBasedColPartitionStoreExec.java         |    5 +-
 .../engine/planner/physical/WindowAggExec.java  |    8 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java |    4 +-
 .../NonForwardQueryResultSystemScanner.java     |   19 +-
 .../apache/tajo/master/exec/QueryExecutor.java  |    2 +-
 .../tajo/master/rm/TajoResourceTracker.java     |    4 +-
 .../tajo/util/TajoUncaughtExceptionHandler.java |   70 +
 .../apache/tajo/util/history/HistoryWriter.java |    2 +-
 .../tajo/worker/ExecutionBlockContext.java      |   83 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  |  844 ++++++++++
 .../apache/tajo/worker/NodeResourceManager.java |   45 +-
 .../apache/tajo/worker/NodeStatusUpdater.java   |   34 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   47 +-
 .../tajo/worker/TajoWorkerManagerService.java   |    9 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 1502 ++++++++----------
 .../apache/tajo/worker/TaskAttemptContext.java  |   61 +-
 .../org/apache/tajo/worker/TaskContainer.java   |   85 +
 .../org/apache/tajo/worker/TaskExecutor.java    |  194 +++
 .../java/org/apache/tajo/worker/TaskImpl.java   |  837 ++++++++++
 .../org/apache/tajo/worker/TaskManager.java     |  180 +++
 .../java/org/apache/tajo/worker/TaskRunner.java |   10 +-
 .../apache/tajo/worker/TaskRunnerHistory.java   |    1 +
 .../apache/tajo/worker/TaskRunnerManager.java   |   12 +-
 .../worker/event/ExecutionBlockStartEvent.java  |   35 +
 .../worker/event/ExecutionBlockStopEvent.java   |   37 +
 .../worker/event/NodeResourceAllocateEvent.java |    2 +-
 .../event/NodeResourceDeallocateEvent.java      |    2 +-
 .../tajo/worker/event/NodeResourceEvent.java    |   35 +
 .../worker/event/NodeResourceManagerEvent.java  |   34 -
 .../tajo/worker/event/NodeStatusEvent.java      |   11 +-
 .../tajo/worker/event/TaskExecutorEvent.java    |   44 +
 .../tajo/worker/event/TaskManagerEvent.java     |   43 +
 .../tajo/worker/event/TaskRunnerEvent.java      |    1 +
 .../tajo/worker/event/TaskRunnerStartEvent.java |   44 +-
 .../tajo/worker/event/TaskRunnerStopEvent.java  |    1 +
 .../tajo/worker/event/TaskStartEvent.java       |   44 +
 .../tajo/ws/rs/resources/QueryResource.java     |   37 +-
 .../ws/rs/resources/QueryResultResource.java    |   21 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |    1 +
 .../apache/tajo/engine/eval/ExprTestBase.java   |   11 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |    4 +-
 .../tajo/engine/eval/TestSQLExpression.java     |    7 +-
 .../planner/TestUniformRangePartition.java      |  216 +--
 .../planner/physical/TestBNLJoinExec.java       |   10 +-
 .../planner/physical/TestExternalSortExec.java  |    2 +-
 .../physical/TestFullOuterHashJoinExec.java     |    6 +-
 .../physical/TestFullOuterMergeJoinExec.java    |    8 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   10 +-
 .../planner/physical/TestHashJoinExec.java      |   10 +-
 .../planner/physical/TestHashPartitioner.java   |   10 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   10 +-
 .../physical/TestLeftOuterHashJoinExec.java     |    6 +-
 .../planner/physical/TestMergeJoinExec.java     |   10 +-
 .../engine/planner/physical/TestNLJoinExec.java |   10 +-
 .../planner/physical/TestPhysicalPlanner.java   |   68 +-
 .../physical/TestProgressExternalSortExec.java  |    2 +-
 .../physical/TestRightOuterHashJoinExec.java    |    6 +-
 .../physical/TestRightOuterMergeJoinExec.java   |    8 +-
 .../engine/planner/physical/TestSortExec.java   |    8 +-
 .../planner/physical/TestTupleSorter.java       |    4 +-
 .../apache/tajo/engine/util/TestTupleUtil.java  |   23 +-
 .../TestNonForwardQueryResultSystemScanner.java |   16 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |  135 +-
 .../org/apache/tajo/storage/TestRowFile.java    |    4 +-
 .../apache/tajo/worker/MockExecutionBlock.java  |   42 +
 .../tajo/worker/MockNodeResourceManager.java    |   96 ++
 .../tajo/worker/MockNodeStatusUpdater.java      |    4 +-
 .../apache/tajo/worker/MockTaskExecutor.java    |  141 ++
 .../org/apache/tajo/worker/MockTaskManager.java |   59 +
 .../apache/tajo/worker/MockWorkerContext.java   |  129 ++
 .../org/apache/tajo/worker/TestFetcher.java     |   14 +-
 .../tajo/worker/TestNodeResourceManager.java    |  135 +-
 .../tajo/worker/TestNodeStatusUpdater.java      |   54 +-
 .../apache/tajo/worker/TestTaskExecutor.java    |  330 ++++
 .../org/apache/tajo/worker/TestTaskManager.java |  185 +++
 .../tajo/ws/rs/resources/TestQueryResource.java |    2 +-
 .../rs/resources/TestQueryResultResource.java   |    2 +-
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |   57 +-
 .../apache/tajo/jdbc/TajoMetaDataResultSet.java |   14 +-
 .../org/apache/tajo/plan/ExprAnnotator.java     |    2 +-
 .../org/apache/tajo/plan/expr/FieldEval.java    |    2 +-
 .../plan/function/PythonAggFunctionInvoke.java  |    2 +-
 .../function/python/PythonScriptEngine.java     |    4 +-
 .../plan/function/stream/CSVLineSerializer.java |   12 +-
 .../stream/TextFieldSerializerDeserializer.java |    4 +-
 .../tajo/storage/BaseTupleComparator.java       |    4 +-
 .../storage/BinarySerializerDeserializer.java   |   52 +-
 .../org/apache/tajo/storage/FrameTuple.java     |   76 +-
 .../java/org/apache/tajo/storage/LazyTuple.java |   57 +-
 .../org/apache/tajo/storage/RowStoreUtil.java   |   42 +-
 .../tajo/storage/SerializerDeserializer.java    |    7 +-
 .../apache/tajo/storage/TableStatistics.java    |   31 +-
 .../storage/TextSerializerDeserializer.java     |   67 +-
 .../org/apache/tajo/storage/TupleRange.java     |    2 -
 .../apache/tajo/tuple/offheap/HeapTuple.java    |   51 +-
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  |   55 +-
 .../org/apache/tajo/storage/TestFrameTuple.java |   14 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |   26 +-
 .../tajo/storage/TestTupleComparator.java       |    7 +-
 .../org/apache/tajo/storage/TestVTuple.java     |   23 +-
 .../tajo/tuple/offheap/TestOffHeapRowBlock.java |   26 +-
 .../storage/hbase/AbstractHBaseAppender.java    |   15 +-
 .../HBaseBinarySerializerDeserializer.java      |   35 +
 .../tajo/storage/hbase/HBasePutAppender.java    |    6 +-
 .../tajo/storage/hbase/HBaseTablespace.java     |    2 +-
 .../hbase/HBaseTextSerializerDeserializer.java  |    9 +
 .../java/org/apache/tajo/storage/CSVFile.java   |    9 +-
 .../storage/FieldSerializerDeserializer.java    |    7 +-
 .../java/org/apache/tajo/storage/RawFile.java   |    4 +-
 .../java/org/apache/tajo/storage/RowFile.java   |   28 +-
 .../apache/tajo/storage/avro/AvroAppender.java  |    5 +-
 .../tajo/storage/json/JsonLineSerializer.java   |    2 +-
 .../tajo/storage/parquet/ParquetAppender.java   |    2 +-
 .../storage/parquet/TajoRecordConverter.java    |    2 +-
 .../tajo/storage/parquet/TajoWriteSupport.java  |   26 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |   22 +-
 .../sequencefile/SequenceFileAppender.java      |   23 +-
 .../sequencefile/SequenceFileScanner.java       |    3 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |   11 +-
 .../tajo/storage/text/CSVLineSerializer.java    |    5 +-
 .../text/TextFieldSerializerDeserializer.java   |   38 +-
 .../tajo/storage/TestDelimitedTextFile.java     |    4 +-
 .../tajo/storage/TestFileStorageManager.java    |    5 +-
 .../apache/tajo/storage/TestFileSystems.java    |    9 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   20 +-
 .../org/apache/tajo/storage/TestStorages.java   |   57 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |   92 +-
 .../index/TestSingleCSVFileBSTIndex.java        |   24 +-
 .../apache/tajo/storage/json/TestJsonSerDe.java |    3 +-
 .../tajo/storage/parquet/TestReadWrite.java     |    2 +-
 270 files changed, 6628 insertions(+), 3121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/CHANGES
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 712fc6d,bc6975a..4ed1313
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@@ -233,15 -114,13 +233,19 @@@ public class BSTIndexScanExec extends P
        }
      } else {
         while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
++         LOG.info("while: " + tuple);
           if (qual.eval(tuple).isTrue()) {
             projector.eval(tuple, outTuple);
++           LOG.info("return: " + outTuple);
             return outTuple;
           } else {
             long offset = reader.next();
 -           if (offset == -1) return null;
++           LOG.info("offset: " + offset);
 +           if (offset == -1) {
 +             return null;
 +           }
             else fileScanner.seek(offset);
++           return null;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 7cb1716,bc0d212..98c5de6
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@@ -23,14 -23,9 +23,13 @@@ import com.google.protobuf.ByteString
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.tajo.QueryId;
- import org.apache.tajo.SessionVars;
  import org.apache.tajo.TaskAttemptId;
  import org.apache.tajo.TaskId;
 -import org.apache.tajo.catalog.*;
 +import org.apache.tajo.catalog.CatalogUtil;
 +import org.apache.tajo.catalog.Column;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.TableDesc;
 +import org.apache.tajo.catalog.TableMeta;
  import org.apache.tajo.catalog.proto.CatalogProtos.*;
  import org.apache.tajo.catalog.statistics.TableStats;
  import org.apache.tajo.common.TajoDataTypes.DataType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index ea04a48,c849940..d2afb83
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -18,853 -18,35 +18,663 @@@
  
  package org.apache.tajo.worker;
  
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
- import io.netty.handler.codec.http.QueryStringDecoder;
- import org.apache.commons.lang.exception.ExceptionUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.tajo.TajoProtos;
- import org.apache.tajo.TajoProtos.TaskAttemptState;
- import org.apache.tajo.TaskAttemptId;
- import org.apache.tajo.catalog.Schema;
- import org.apache.tajo.catalog.TableDesc;
- import org.apache.tajo.catalog.TableMeta;
- import org.apache.tajo.catalog.proto.CatalogProtos;
- import org.apache.tajo.catalog.statistics.TableStats;
- import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.engine.planner.physical.PhysicalExec;
- import org.apache.tajo.engine.query.QueryContext;
- import org.apache.tajo.engine.query.TaskRequest;
- import org.apache.tajo.ipc.QueryMasterProtocol;
- import org.apache.tajo.ipc.TajoWorkerProtocol.*;
- import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
- import org.apache.tajo.master.cluster.WorkerConnectionInfo;
- import org.apache.tajo.plan.function.python.TajoScriptEngine;
- import org.apache.tajo.plan.logical.*;
- import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
- import org.apache.tajo.plan.util.PlannerUtil;
- import org.apache.tajo.pullserver.TajoPullServerService;
- import org.apache.tajo.pullserver.retriever.FileChunk;
- import org.apache.tajo.rpc.NullCallback;
- import org.apache.tajo.storage.*;
- import org.apache.tajo.storage.fragment.FileFragment;
- import org.apache.tajo.util.NetUtils;
+ import org.apache.tajo.ipc.TajoWorkerProtocol;
  
- import java.io.File;
  import java.io.IOException;
- import java.net.InetAddress;
- import java.net.URI;
- import java.util.*;
- import java.util.Map.Entry;
- import java.util.concurrent.ExecutorService;
  
- import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
- import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
- 
- public class Task {
-   private static final Log LOG = LogFactory.getLog(Task.class);
-   private static final float FETCHER_PROGRESS = 0.5f;
- 
-   private final TajoConf systemConf;
-   private final QueryContext queryContext;
-   private final ExecutionBlockContext executionBlockContext;
-   private final TaskAttemptId taskId;
-   private final String taskRunnerId;
- 
-   private final Path taskDir;
-   private final TaskRequest request;
-   private TaskAttemptContext context;
-   private List<Fetcher> fetcherRunners;
-   private LogicalNode plan;
-   private final Map<String, TableDesc> descs = Maps.newHashMap();
-   private PhysicalExec executor;
-   private boolean interQuery;
-   private Path inputTableBaseDir;
- 
-   private long startTime;
-   private long finishTime;
- 
-   private final TableStats inputStats;
-   private List<FileChunk> localChunks;
- 
-   // TODO - to be refactored
-   private ShuffleType shuffleType = null;
-   private Schema finalSchema = null;
-   private TupleComparator sortComp = null;
- 
-   public Task(String taskRunnerId,
-               Path baseDir,
-               TaskAttemptId taskId,
-               final ExecutionBlockContext executionBlockContext,
-               final TaskRequest request) throws IOException {
-     this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
-   }
- 
-   public Task(String taskRunnerId,
-               Path baseDir,
-               TaskAttemptId taskId,
-               TajoConf conf,
-               final ExecutionBlockContext executionBlockContext,
-               final TaskRequest request) throws IOException {
-     this.taskRunnerId = taskRunnerId;
-     this.request = request;
-     this.taskId = taskId;
- 
-     this.systemConf = conf;
-     this.queryContext = request.getQueryContext(systemConf);
-     this.executionBlockContext = executionBlockContext;
-     this.taskDir = StorageUtil.concatPath(baseDir,
-         taskId.getTaskId().getId() + "_" + taskId.getId());
- 
-     this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
-         request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
-     this.context.setDataChannel(request.getDataChannel());
-     this.context.setEnforcer(request.getEnforcer());
-     this.context.setState(TaskAttemptState.TA_PENDING);
-     this.inputStats = new TableStats();
-     this.fetcherRunners = Lists.newArrayList();
-   }
- 
-   public void initPlan() throws IOException {
-     plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
-     updateDescsForScanNodes(NodeType.SCAN);
-     updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
-     updateDescsForScanNodes(NodeType.INDEX_SCAN);
-     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-     if (scanNode != null) {
-       for (LogicalNode node : scanNode) {
-         ScanNode scan = (ScanNode) node;
-         descs.put(scan.getCanonicalName(), scan.getTableDesc());
-       }
-     }
- 
-     LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
-     if (partitionScanNode != null) {
-       for (LogicalNode node : partitionScanNode) {
-         PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
-         descs.put(scan.getCanonicalName(), scan.getTableDesc());
-       }
-     }
- 
-     interQuery = request.getProto().getInterQuery();
-     if (interQuery) {
-       context.setInterQuery();
-       this.shuffleType = context.getDataChannel().getShuffleType();
- 
-       if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-         SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
-         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
-         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
-       }
-     } else {
-       Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
-           .getAppenderFilePath(taskId, queryContext.getStagingDir());
-       LOG.info("Output File Path: " + outFilePath);
-       context.setOutputPath(outFilePath);
-     }
- 
-     this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
-     LOG.info("==================================");
-     LOG.info("* Stage " + request.getId() + " is initialized");
-     LOG.info("* InterQuery: " + interQuery
-         + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
-         ", Fragments (num: " + request.getFragments().size() + ")" +
-         ", Fetches (total:" + request.getFetches().size() + ") :");
- 
-     if(LOG.isDebugEnabled()) {
-       for (FetchImpl f : request.getFetches()) {
-         LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
-       }
-     }
-     LOG.info("* Local task dir: " + taskDir);
-     if(LOG.isDebugEnabled()) {
-       LOG.debug("* plan:\n");
-       LOG.debug(plan.toString());
-     }
-     LOG.info("==================================");
-   }
- 
-   private void updateDescsForScanNodes(NodeType nodeType) {
-     assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
-     LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
-     if (scanNodes != null) {
-       for (LogicalNode node : scanNodes) {
-         ScanNode scanNode = (ScanNode) node;
-         descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
-       }
-     }
-   }
- 
-   private void startScriptExecutors() throws IOException {
-     for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-       executor.start(systemConf);
-     }
-   }
- 
-   private void stopScriptExecutors() {
-     for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-       executor.shutdown();
-     }
-   }
- 
-   public void init() throws IOException {
-     initPlan();
-     startScriptExecutors();
- 
-     if (context.getState() == TaskAttemptState.TA_PENDING) {
-       // initialize a task temporal dir
-       FileSystem localFS = executionBlockContext.getLocalFS();
-       localFS.mkdirs(taskDir);
- 
-       if (request.getFetches().size() > 0) {
-         inputTableBaseDir = localFS.makeQualified(
-             executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
-                 getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
-         localFS.mkdirs(inputTableBaseDir);
-         Path tableDir;
-         for (String inputTable : context.getInputTables()) {
-           tableDir = new Path(inputTableBaseDir, inputTable);
-           if (!localFS.exists(tableDir)) {
-             LOG.info("the directory is created  " + tableDir.toUri());
-             localFS.mkdirs(tableDir);
-           }
-         }
-       }
-       // for localizing the intermediate data
-       fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
-     }
-   }
- 
-   public TaskAttemptId getTaskId() {
-     return taskId;
-   }
- 
-   public TaskAttemptId getId() {
-     return context.getTaskId();
-   }
- 
-   public TaskAttemptState getStatus() {
-     return context.getState();
-   }
- 
-   public String toString() {
-     return "queryId: " + this.getId() + " status: " + this.getStatus();
-   }
- 
-   public void setState(TaskAttemptState status) {
-     context.setState(status);
-   }
- 
-   public TaskAttemptContext getContext() {
-     return context;
-   }
- 
-   public boolean hasFetchPhase() {
-     return fetcherRunners.size() > 0;
-   }
- 
-   public List<Fetcher> getFetchers() {
-     return new ArrayList<Fetcher>(fetcherRunners);
-   }
- 
-   public void fetch() {
-     ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
-     for (Fetcher f : fetcherRunners) {
-       executorService.submit(new FetchRunner(context, f));
-     }
-   }
- 
-   public void kill() {
-     stopScriptExecutors();
-     context.setState(TaskAttemptState.TA_KILLED);
-     context.stop();
-   }
- 
-   public void abort() {
-     stopScriptExecutors();
-     context.stop();
-   }
- 
-   public void cleanUp() {
-     // remove itself from worker
-     if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
-       synchronized (executionBlockContext.getTasks()) {
-         executionBlockContext.getTasks().remove(this.getId());
-       }
-     } else {
-       LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
-     }
-   }
- 
-   public TaskStatusProto getReport() {
-     TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-     builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
-     builder.setId(context.getTaskId().getProto())
-         .setProgress(context.getProgress())
-         .setState(context.getState());
- 
-     builder.setInputStats(reloadInputStats());
- 
-     if (context.getResultStats() != null) {
-       builder.setResultStats(context.getResultStats().getProto());
-     }
-     return builder.build();
-   }
- 
-   public boolean isRunning(){
-     return context.getState() == TaskAttemptState.TA_RUNNING;
-   }
- 
-   public boolean isProgressChanged() {
-     return context.isProgressChanged();
-   }
- 
-   public void updateProgress() {
-     if(context != null && context.isStopped()){
-       return;
-     }
- 
-     if (executor != null && context.getProgress() < 1.0f) {
-       context.setExecutorProgress(executor.getProgress());
-     }
-   }
- 
-   private CatalogProtos.TableStatsProto reloadInputStats() {
-     synchronized(inputStats) {
-       if (this.executor == null) {
-         return inputStats.getProto();
-       }
- 
-       TableStats executorInputStats = this.executor.getInputStats();
- 
-       if (executorInputStats != null) {
-         inputStats.setValues(executorInputStats);
-       }
-       return inputStats.getProto();
-     }
-   }
- 
-   private TaskCompletionReport getTaskCompletionReport() {
-     TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
-     builder.setId(context.getTaskId().getProto());
- 
-     builder.setInputStats(reloadInputStats());
- 
-     if (context.hasResultStats()) {
-       builder.setResultStats(context.getResultStats().getProto());
-     } else {
-       builder.setResultStats(new TableStats().getProto());
-     }
- 
-     Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
-     if (it.hasNext()) {
-       do {
-         Entry<Integer, String> entry = it.next();
-         ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
-         part.setPartId(entry.getKey());
- 
-         // Set output volume
-         if (context.getPartitionOutputVolume() != null) {
-           for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
-             if (entry.getKey().equals(e.getKey())) {
-               part.setVolume(e.getValue().longValue());
-               break;
-             }
-           }
-         }
- 
-         builder.addShuffleFileOutputs(part.build());
-       } while (it.hasNext());
-     }
- 
-     return builder.build();
-   }
- 
-   private void waitForFetch() throws InterruptedException, IOException {
-     context.getFetchLatch().await();
-     LOG.info(context.getTaskId() + " All fetches are done!");
-     Collection<String> inputs = Lists.newArrayList(context.getInputTables());
- 
-     // Get all broadcasted tables
-     Set<String> broadcastTableNames = new HashSet<String>();
-     List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
-     if (broadcasts != null) {
-       for (EnforceProperty eachBroadcast : broadcasts) {
-         broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
-       }
-     }
- 
-     // localize the fetched data and skip the broadcast table
-     for (String inputTable: inputs) {
-       if (broadcastTableNames.contains(inputTable)) {
-         continue;
-       }
-       File tableDir = new File(context.getFetchIn(), inputTable);
-       FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
-       context.updateAssignedFragments(inputTable, frags);
-     }
-   }
- 
-   public void run() throws Exception {
-     startTime = System.currentTimeMillis();
-     Throwable error = null;
-     try {
-       if(!context.isStopped()) {
-         context.setState(TaskAttemptState.TA_RUNNING);
-         if (context.hasFetchPhase()) {
-           // If the fetch is still in progress, the query unit must wait for
-           // complete.
-           waitForFetch();
-           context.setFetcherProgress(FETCHER_PROGRESS);
-           context.setProgressChanged(true);
-           updateProgress();
-         }
- 
-         this.executor = executionBlockContext.getTQueryEngine().
-             createPlan(context, plan);
-         this.executor.init();
- 
-         while(!context.isStopped() && executor.next() != null) {
-         }
-       }
-     } catch (Throwable e) {
-       error = e ;
-       LOG.error(e.getMessage(), e);
-       stopScriptExecutors();
-       context.stop();
-     } finally {
-       if (executor != null) {
-         try {
-           executor.close();
-           reloadInputStats();
-         } catch (IOException e) {
-           LOG.error(e, e);
-         }
-         this.executor = null;
-       }
- 
-       executionBlockContext.completedTasksNum.incrementAndGet();
-       context.getHashShuffleAppenderManager().finalizeTask(taskId);
- 
-       QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
-       if (context.isStopped()) {
-         context.setExecutorProgress(0.0f);
- 
-         if (context.getState() == TaskAttemptState.TA_KILLED) {
-           queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
-           executionBlockContext.killedTasksNum.incrementAndGet();
-         } else {
-           context.setState(TaskAttemptState.TA_FAILED);
-           TaskFatalErrorReport.Builder errorBuilder =
-               TaskFatalErrorReport.newBuilder()
-                   .setId(getId().getProto());
-           if (error != null) {
-             if (error.getMessage() == null) {
-               errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-             } else {
-               errorBuilder.setErrorMessage(error.getMessage());
-             }
-             errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-           }
- 
-           queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
-           executionBlockContext.failedTasksNum.incrementAndGet();
-         }
-       } else {
-         // if successful
-         context.setProgress(1.0f);
-         context.setState(TaskAttemptState.TA_SUCCEEDED);
-         executionBlockContext.succeededTasksNum.incrementAndGet();
- 
-         TaskCompletionReport report = getTaskCompletionReport();
-         queryMasterStub.done(null, report, NullCallback.get());
-       }
-       finishTime = System.currentTimeMillis();
-       LOG.info(context.getTaskId() + " completed. " +
-           "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
-           ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-           + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-           + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-       cleanupTask();
-     }
-   }
- 
-   public void cleanupTask() {
-     TaskHistory taskHistory = createTaskHistory();
-     executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
-     executionBlockContext.getTasks().remove(getId());
- 
-     fetcherRunners.clear();
-     fetcherRunners = null;
-     try {
-       if(executor != null) {
-         executor.close();
-         executor = null;
-       }
-     } catch (IOException e) {
-       LOG.fatal(e.getMessage(), e);
-     }
- 
-     executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
-     stopScriptExecutors();
-   }
- 
-   public TaskHistory createTaskHistory() {
-     TaskHistory taskHistory = null;
-     try {
-       taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
-           startTime, finishTime, reloadInputStats());
- 
-       if (context.getOutputPath() != null) {
-         taskHistory.setOutputPath(context.getOutputPath().toString());
-       }
- 
-       if (context.getWorkDir() != null) {
-         taskHistory.setWorkingPath(context.getWorkDir().toString());
-       }
- 
-       if (context.getResultStats() != null) {
-         taskHistory.setOutputStats(context.getResultStats().getProto());
-       }
- 
-       if (hasFetchPhase()) {
-         taskHistory.setTotalFetchCount(fetcherRunners.size());
-         int i = 0;
-         FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
-         for (Fetcher fetcher : fetcherRunners) {
-           // TODO store the fetcher histories
-           if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
-             builder.setStartTime(fetcher.getStartTime());
-             builder.setFinishTime(fetcher.getFinishTime());
-             builder.setFileLength(fetcher.getFileLen());
-             builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
-             builder.setState(fetcher.getState());
- 
-             taskHistory.addFetcherHistory(builder.build());
-           }
-           if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
-         }
-         taskHistory.setFinishedFetchCount(i);
-       }
-     } catch (Exception e) {
-       LOG.warn(e.getMessage(), e);
-     }
- 
-     return taskHistory;
-   }
- 
-   public int hashCode() {
-     return context.hashCode();
-   }
- 
-   public boolean equals(Object obj) {
-     if (obj instanceof Task) {
-       Task other = (Task) obj;
-       return this.context.equals(other.context);
-     }
-     return false;
-   }
- 
-   private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
-       throws IOException {
-     Configuration c = new Configuration(systemConf);
-     c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
-     FileSystem fs = FileSystem.get(c);
-     Path tablePath = new Path(file.getAbsolutePath());
- 
-     List<FileFragment> listTablets = new ArrayList<FileFragment>();
-     FileFragment tablet;
- 
-     FileStatus[] fileLists = fs.listStatus(tablePath);
-     for (FileStatus f : fileLists) {
-       if (f.getLen() == 0) {
-         continue;
-       }
-       tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
-       listTablets.add(tablet);
-     }
- 
-     // Special treatment for locally pseudo fetched chunks
-     synchronized (localChunks) {
-       for (FileChunk chunk : localChunks) {
-         if (name.equals(chunk.getEbId())) {
-           tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
-           listTablets.add(tablet);
-           LOG.info("One local chunk is added to listTablets");
-         }
-       }
-     }
- 
-     FileFragment[] tablets = new FileFragment[listTablets.size()];
-     listTablets.toArray(tablets);
- 
-     return tablets;
-   }
- 
-   private class FetchRunner implements Runnable {
-     private final TaskAttemptContext ctx;
-     private final Fetcher fetcher;
-     private int maxRetryNum;
- 
-     public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
-       this.ctx = ctx;
-       this.fetcher = fetcher;
-       this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
-     }
- 
-     @Override
-     public void run() {
-       int retryNum = 0;
-       int retryWaitTime = 1000; //sec
- 
-       try { // for releasing fetch latch
-         while(!context.isStopped() && retryNum < maxRetryNum) {
-           if (retryNum > 0) {
-             try {
-               Thread.sleep(retryWaitTime);
-               retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
-             } catch (InterruptedException e) {
-               LOG.error(e);
-             }
-             LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
-           }
-           try {
-             FileChunk fetched = fetcher.get();
-             if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
-           && fetched.getFile() != null) {
-               if (fetched.fromRemote() == false) {
-           localChunks.add(fetched);
-           LOG.info("Add a new FileChunk to local chunk list");
-               }
-               break;
-             }
-           } catch (Throwable e) {
-             LOG.error("Fetch failed: " + fetcher.getURI(), e);
-           }
-           retryNum++;
-         }
-       } finally {
-         if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
-           fetcherFinished(ctx);
-         } else {
-           if (retryNum == maxRetryNum) {
-             LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
-           }
-           stopScriptExecutors();
-           context.stop(); // retry task
-           ctx.getFetchLatch().countDown();
-         }
-       }
-     }
-   }
- 
-   @VisibleForTesting
-   public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
-     if (totalFetcher > 0) {
-       return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
-     } else {
-       return 0.0f;
-     }
-   }
- 
-   private synchronized void fetcherFinished(TaskAttemptContext ctx) {
-     int fetcherSize = fetcherRunners.size();
-     if(fetcherSize == 0) {
-       return;
-     }
- 
-     ctx.getFetchLatch().countDown();
- 
-     int remainFetcher = (int) ctx.getFetchLatch().getCount();
-     if (remainFetcher == 0) {
-       context.setFetcherProgress(FETCHER_PROGRESS);
-     } else {
-       context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
-       context.setProgressChanged(true);
-     }
-   }
- 
-   private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                         List<FetchImpl> fetches) throws IOException {
- 
-     if (fetches.size() > 0) {
-       Path inputDir = executionBlockContext.getLocalDirAllocator().
-           getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
- 
-       int i = 0;
-       File storeDir;
-       File defaultStoreFile;
-       FileChunk storeChunk = null;
-       List<Fetcher> runnerList = Lists.newArrayList();
- 
-       for (FetchImpl f : fetches) {
-         storeDir = new File(inputDir.toString(), f.getName());
-         if (!storeDir.exists()) {
-           storeDir.mkdirs();
-         }
- 
-         for (URI uri : f.getURIs()) {
-           defaultStoreFile = new File(storeDir, "in_" + i);
-           InetAddress address = InetAddress.getByName(uri.getHost());
- 
-           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
-           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-             boolean hasError = false;
-             try {
-               LOG.info("Try to get local file chunk at local host");
-               storeChunk = getLocalStoredFileChunk(uri, systemConf);
-             } catch (Throwable t) {
-               hasError = true;
-             }
- 
-             // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-             // So, we should skip and don't need to create storeChunk.
-             if (storeChunk == null && !hasError) {
-               continue;
-             }
- 
-             if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
-                 && hasError == false) {
-               storeChunk.setFromRemote(false);
-             } else {
-               storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-               storeChunk.setFromRemote(true);
-             }
-           } else {
-             storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-             storeChunk.setFromRemote(true);
-           }
- 
-           // If we decide that intermediate data should be really fetched from a remote host, storeChunk
-           // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-           storeChunk.setEbId(f.getName());
-           Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-           LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
-           runnerList.add(fetcher);
-           i++;
-         }
-       }
-       ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
-       return runnerList;
-     } else {
-       return Lists.newArrayList();
-     }
-   }
- 
-   private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
-     // Parse the URI
-     LOG.info("getLocalStoredFileChunk starts");
-     final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
-     final List<String> types = params.get("type");
-     final List<String> qids = params.get("qid");
-     final List<String> taskIdList = params.get("ta");
-     final List<String> stageIds = params.get("sid");
-     final List<String> partIds = params.get("p");
-     final List<String> offsetList = params.get("offset");
-     final List<String> lengthList = params.get("length");
- 
-     if (types == null || stageIds == null || qids == null || partIds == null) {
-       LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
-       return null;
-     }
- 
-     if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-       LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
-       return null;
-     }
- 
-     String queryId = qids.get(0);
-     String shuffleType = types.get(0);
-     String sid = stageIds.get(0);
-     String partId = partIds.get(0);
- 
-     if (shuffleType.equals("r") && taskIdList == null) {
-       LOG.error("Invalid URI - For range shuffle, taskId is required");
-       return null;
-     }
-     List<String> taskIds = splitMaps(taskIdList);
- 
-     FileChunk chunk = null;
-     long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-     long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
- 
-     LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- 	+ ", taskIds=" + taskIdList);
- 
-     // The working directory of Tajo worker for each query, including stage
-     String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
- 
-     // If the stage requires a range shuffle
-     if (shuffleType.equals("r")) {
-       String ta = taskIds.get(0);
-       if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
-         LOG.warn("Range shuffle - file not exist");
-         return null;
-       }
-       Path path = executionBlockContext.getLocalFS().makeQualified(
- 	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
-       String startKey = params.get("start").get(0);
-       String endKey = params.get("end").get(0);
-       boolean last = params.get("final") != null;
- 
-       try {
-         chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-             } catch (Throwable t) {
-         LOG.error("getFileChunks() throws exception");
-         return null;
-       }
- 
-       // If the stage requires a hash shuffle or a scattered hash shuffle
-     } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-       int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
-       String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
-       if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
-         LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-         return null;
-       }
-       Path path = executionBlockContext.getLocalFS().makeQualified(
-         executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
-       File file = new File(path.toUri());
-       long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-       long readLen = (offset >= 0 && length >= 0) ? length : file.length();
- 
-       if (startPos >= file.length()) {
-         LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-         return null;
-       }
-       chunk = new FileChunk(file, startPos, readLen);
- 
-     } else {
-       LOG.error("Unknown shuffle type");
-       return null;
-     }
- 
-     return chunk;
-   }
- 
-   private List<String> splitMaps(List<String> mapq) {
-     if (null == mapq) {
-       return null;
-     }
-     final List<String> ret = new ArrayList<String>();
-     for (String s : mapq) {
-       Collections.addAll(ret, s.split(","));
-     }
-     return ret;
-   }
- 
-   public static Path getTaskAttemptDir(TaskAttemptId quid) {
-     Path workDir =
-         StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-             String.valueOf(quid.getTaskId().getId()),
-             String.valueOf(quid.getId()));
-     return workDir;
-   }
+ public interface Task {
+ 
+   void init() throws IOException;
+ 
+   void fetch();
+ 
+   void run() throws Exception;
+ 
+   void kill();
+ 
+   void abort();
+ 
+   void cleanup();
+ 
+   boolean hasFetchPhase();
+ 
+   boolean isProgressChanged();
+ 
+   boolean isStopped();
+ 
+   void updateProgress();
+ 
+   TaskAttemptContext getTaskContext();
+ 
+   ExecutionBlockContext getExecutionBlockContext();
+ 
++//<<<<<<< HEAD
++//
++//  public TaskAttemptId getTaskId() {
++//    return taskId;
++//  }
++//
++//  public TaskAttemptId getId() {
++//    return context.getTaskId();
++//  }
++//
++//  public TaskAttemptState getStatus() {
++//    return context.getState();
++//  }
++//
++//  public String toString() {
++//    return "queryId: " + this.getId() + " status: " + this.getStatus();
++//  }
++//
++//  public void setState(TaskAttemptState status) {
++//    context.setState(status);
++//  }
++//
++//  public TaskAttemptContext getContext() {
++//    return context;
++//  }
++//
++//  public boolean hasFetchPhase() {
++//    return fetcherRunners.size() > 0;
++//  }
++//
++//  public List<Fetcher> getFetchers() {
++//    return new ArrayList<Fetcher>(fetcherRunners);
++//  }
++//
++//  public void fetch() {
++//    ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
++//    for (Fetcher f : fetcherRunners) {
++//      executorService.submit(new FetchRunner(context, f));
++//    }
++//  }
++//
++//  public void kill() {
++//    stopScriptExecutors();
++//    context.setState(TaskAttemptState.TA_KILLED);
++//    context.stop();
++//  }
++//
++//  public void abort() {
++//    stopScriptExecutors();
++//    context.stop();
++//  }
++//
++//  public void cleanUp() {
++//    // remove itself from worker
++//    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
++//      synchronized (executionBlockContext.getTasks()) {
++//        executionBlockContext.getTasks().remove(this.getId());
++//      }
++//    } else {
++//      LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
++//    }
++//  }
++//
++//  public TaskStatusProto getReport() {
++//    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
++//    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
++//    builder.setId(context.getTaskId().getProto())
++//        .setProgress(context.getProgress())
++//        .setState(context.getState());
++//
++//    builder.setInputStats(reloadInputStats());
++//
++//    if (context.getResultStats() != null) {
++//      builder.setResultStats(context.getResultStats().getProto());
++//    }
++//    return builder.build();
++//  }
++//
++//  public boolean isRunning(){
++//    return context.getState() == TaskAttemptState.TA_RUNNING;
++//  }
++//
++//  public boolean isProgressChanged() {
++//    return context.isProgressChanged();
++//  }
++//
++//  public void updateProgress() {
++//    if(context != null && context.isStopped()){
++//      return;
++//    }
++//
++//    if (executor != null && context.getProgress() < 1.0f) {
++//      context.setExecutorProgress(executor.getProgress());
++//    }
++//  }
++//
++//  private CatalogProtos.TableStatsProto reloadInputStats() {
++//    synchronized(inputStats) {
++//      if (this.executor == null) {
++//        return inputStats.getProto();
++//      }
++//
++//      TableStats executorInputStats = this.executor.getInputStats();
++//
++//      if (executorInputStats != null) {
++//        inputStats.setValues(executorInputStats);
++//      }
++//      return inputStats.getProto();
++//    }
++//  }
++//
++//  private TaskCompletionReport getTaskCompletionReport() {
++//    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
++//    builder.setId(context.getTaskId().getProto());
++//
++//    builder.setInputStats(reloadInputStats());
++//
++//    if (context.hasResultStats()) {
++//      builder.setResultStats(context.getResultStats().getProto());
++//    } else {
++//      builder.setResultStats(new TableStats().getProto());
++//    }
++//
++//    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
++//    if (it.hasNext()) {
++//      do {
++//        Entry<Integer, String> entry = it.next();
++//        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
++//        part.setPartId(entry.getKey());
++//
++//        // Set output volume
++//        if (context.getPartitionOutputVolume() != null) {
++//          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
++//            if (entry.getKey().equals(e.getKey())) {
++//              part.setVolume(e.getValue().longValue());
++//              break;
++//            }
++//          }
++//        }
++//
++//        builder.addShuffleFileOutputs(part.build());
++//      } while (it.hasNext());
++//    }
++//
++//    return builder.build();
++//  }
++//
++//  private void waitForFetch() throws InterruptedException, IOException {
++//    context.getFetchLatch().await();
++//    LOG.info(context.getTaskId() + " All fetches are done!");
++//    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
++//
++//    // Get all broadcasted tables
++//    Set<String> broadcastTableNames = new HashSet<String>();
++//    List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
++//    if (broadcasts != null) {
++//      for (EnforceProperty eachBroadcast : broadcasts) {
++//        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
++//      }
++//    }
++//
++//    // localize the fetched data and skip the broadcast table
++//    for (String inputTable: inputs) {
++//      if (broadcastTableNames.contains(inputTable)) {
++//        continue;
++//      }
++//      File tableDir = new File(context.getFetchIn(), inputTable);
++//      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
++//      context.updateAssignedFragments(inputTable, frags);
++//    }
++//  }
++//
++//  public void run() throws Exception {
++//    startTime = System.currentTimeMillis();
++//    Throwable error = null;
++//    try {
++//      if(!context.isStopped()) {
++//        context.setState(TaskAttemptState.TA_RUNNING);
++//        if (context.hasFetchPhase()) {
++//          // If the fetch is still in progress, the query unit must wait for
++//          // complete.
++//          waitForFetch();
++//          context.setFetcherProgress(FETCHER_PROGRESS);
++//          context.setProgressChanged(true);
++//          updateProgress();
++//        }
++//
++//        this.executor = executionBlockContext.getTQueryEngine().
++//            createPlan(context, plan);
++//        this.executor.init();
++//
++//        while(!context.isStopped() && executor.next() != null) {
++//        }
++//      }
++//    } catch (Throwable e) {
++//      error = e ;
++//      LOG.error(e.getMessage(), e);
++//      stopScriptExecutors();
++//      context.stop();
++//    } finally {
++//      if (executor != null) {
++//        try {
++//          executor.close();
++//          reloadInputStats();
++//        } catch (IOException e) {
++//          LOG.error(e, e);
++//        }
++//        this.executor = null;
++//      }
++//
++//      executionBlockContext.completedTasksNum.incrementAndGet();
++//      context.getHashShuffleAppenderManager().finalizeTask(taskId);
++//
++//      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
++//      if (context.isStopped()) {
++//        context.setExecutorProgress(0.0f);
++//
++//        if (context.getState() == TaskAttemptState.TA_KILLED) {
++//          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
++//          executionBlockContext.killedTasksNum.incrementAndGet();
++//        } else {
++//          context.setState(TaskAttemptState.TA_FAILED);
++//          TaskFatalErrorReport.Builder errorBuilder =
++//              TaskFatalErrorReport.newBuilder()
++//                  .setId(getId().getProto());
++//          if (error != null) {
++//            if (error.getMessage() == null) {
++//              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
++//            } else {
++//              errorBuilder.setErrorMessage(error.getMessage());
++//            }
++//            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
++//          }
++//
++//          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
++//          executionBlockContext.failedTasksNum.incrementAndGet();
++//        }
++//      } else {
++//        // if successful
++//        context.setProgress(1.0f);
++//        context.setState(TaskAttemptState.TA_SUCCEEDED);
++//        executionBlockContext.succeededTasksNum.incrementAndGet();
++//
++//        TaskCompletionReport report = getTaskCompletionReport();
++//        queryMasterStub.done(null, report, NullCallback.get());
++//      }
++//      finishTime = System.currentTimeMillis();
++//      LOG.info(context.getTaskId() + " completed. " +
++//          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
++//          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
++//          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
++//          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
++//      cleanupTask();
++//    }
++//  }
++//
++//  public void cleanupTask() {
++//    TaskHistory taskHistory = createTaskHistory();
++//    executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
++//    executionBlockContext.getTasks().remove(getId());
++//
++//    fetcherRunners.clear();
++//    fetcherRunners = null;
++//    try {
++//      if(executor != null) {
++//        executor.close();
++//        executor = null;
++//      }
++//    } catch (IOException e) {
++//      LOG.fatal(e.getMessage(), e);
++//    }
++//
++//    executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
++//    stopScriptExecutors();
++//  }
++//
++//  public TaskHistory createTaskHistory() {
++//    TaskHistory taskHistory = null;
++//    try {
++//      taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
++//          startTime, finishTime, reloadInputStats());
++//
++//      if (context.getOutputPath() != null) {
++//        taskHistory.setOutputPath(context.getOutputPath().toString());
++//      }
++//
++//      if (context.getWorkDir() != null) {
++//        taskHistory.setWorkingPath(context.getWorkDir().toString());
++//      }
++//
++//      if (context.getResultStats() != null) {
++//        taskHistory.setOutputStats(context.getResultStats().getProto());
++//      }
++//
++//      if (hasFetchPhase()) {
++//        taskHistory.setTotalFetchCount(fetcherRunners.size());
++//        int i = 0;
++//        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
++//        for (Fetcher fetcher : fetcherRunners) {
++//          // TODO store the fetcher histories
++//          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
++//            builder.setStartTime(fetcher.getStartTime());
++//            builder.setFinishTime(fetcher.getFinishTime());
++//            builder.setFileLength(fetcher.getFileLen());
++//            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
++//            builder.setState(fetcher.getState());
++//
++//            taskHistory.addFetcherHistory(builder.build());
++//          }
++//          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
++//        }
++//        taskHistory.setFinishedFetchCount(i);
++//      }
++//    } catch (Exception e) {
++//      LOG.warn(e.getMessage(), e);
++//    }
++//
++//    return taskHistory;
++//  }
++//
++//  public int hashCode() {
++//    return context.hashCode();
++//  }
++//
++//  public boolean equals(Object obj) {
++//    if (obj instanceof Task) {
++//      Task other = (Task) obj;
++//      return this.context.equals(other.context);
++//    }
++//    return false;
++//  }
++//
++//  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
++//      throws IOException {
++//    Configuration c = new Configuration(systemConf);
++//    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
++//    FileSystem fs = FileSystem.get(c);
++//    Path tablePath = new Path(file.getAbsolutePath());
++//
++//    List<FileFragment> listTablets = new ArrayList<FileFragment>();
++//    FileFragment tablet;
++//
++//    FileStatus[] fileLists = fs.listStatus(tablePath);
++//    for (FileStatus f : fileLists) {
++//      if (f.getLen() == 0) {
++//        continue;
++//      }
++//      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
++//      listTablets.add(tablet);
++//    }
++//
++//    // Special treatment for locally pseudo fetched chunks
++//    synchronized (localChunks) {
++//      for (FileChunk chunk : localChunks) {
++//        if (name.equals(chunk.getEbId())) {
++//          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
++//          listTablets.add(tablet);
++//          LOG.info("One local chunk is added to listTablets");
++//        }
++//      }
++//    }
++//
++//    FileFragment[] tablets = new FileFragment[listTablets.size()];
++//    listTablets.toArray(tablets);
++//
++//    return tablets;
++//  }
++//
++//  private class FetchRunner implements Runnable {
++//    private final TaskAttemptContext ctx;
++//    private final Fetcher fetcher;
++//    private int maxRetryNum;
++//
++//    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
++//      this.ctx = ctx;
++//      this.fetcher = fetcher;
++//      this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
++//    }
++//
++//    @Override
++//    public void run() {
++//      int retryNum = 0;
++//      int retryWaitTime = 1000; //sec
++//
++//      try { // for releasing fetch latch
++//        while(!context.isStopped() && retryNum < maxRetryNum) {
++//          if (retryNum > 0) {
++//            try {
++//              Thread.sleep(retryWaitTime);
++//              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
++//            } catch (InterruptedException e) {
++//              LOG.error(e);
++//            }
++//            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
++//          }
++//          try {
++//            FileChunk fetched = fetcher.get();
++//            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
++//          && fetched.getFile() != null) {
++//              if (fetched.fromRemote() == false) {
++//          localChunks.add(fetched);
++//          LOG.info("Add a new FileChunk to local chunk list");
++//              }
++//              break;
++//            }
++//          } catch (Throwable e) {
++//            LOG.error("Fetch failed: " + fetcher.getURI(), e);
++//          }
++//          retryNum++;
++//        }
++//      } finally {
++//        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
++//          fetcherFinished(ctx);
++//        } else {
++//          if (retryNum == maxRetryNum) {
++//            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
++//          }
++//          stopScriptExecutors();
++//          context.stop(); // retry task
++//          ctx.getFetchLatch().countDown();
++//        }
++//      }
++//    }
++//  }
++//
++//  @VisibleForTesting
++//  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
++//    if (totalFetcher > 0) {
++//      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
++//    } else {
++//      return 0.0f;
++//    }
++//  }
++//
++//  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
++//    int fetcherSize = fetcherRunners.size();
++//    if(fetcherSize == 0) {
++//      return;
++//    }
++//
++//    ctx.getFetchLatch().countDown();
++//
++//    int remainFetcher = (int) ctx.getFetchLatch().getCount();
++//    if (remainFetcher == 0) {
++//      context.setFetcherProgress(FETCHER_PROGRESS);
++//    } else {
++//      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
++//      context.setProgressChanged(true);
++//    }
++//  }
++//
++//  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
++//                                        List<FetchImpl> fetches) throws IOException {
++//
++//    if (fetches.size() > 0) {
++//      Path inputDir = executionBlockContext.getLocalDirAllocator().
++//          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
++//
++//      int i = 0;
++//      File storeDir;
++//      File defaultStoreFile;
++//      FileChunk storeChunk = null;
++//      List<Fetcher> runnerList = Lists.newArrayList();
++//
++//      for (FetchImpl f : fetches) {
++//        storeDir = new File(inputDir.toString(), f.getName());
++//        if (!storeDir.exists()) {
++//          storeDir.mkdirs();
++//        }
++//
++//        for (URI uri : f.getURIs()) {
++//          defaultStoreFile = new File(storeDir, "in_" + i);
++//          InetAddress address = InetAddress.getByName(uri.getHost());
++//
++//          WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
++//          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
++//            boolean hasError = false;
++//            try {
++//              LOG.info("Try to get local file chunk at local host");
++//              storeChunk = getLocalStoredFileChunk(uri, systemConf);
++//            } catch (Throwable t) {
++//              hasError = true;
++//            }
++//
++//            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
++//            // So, we should skip and don't need to create storeChunk.
++//            if (storeChunk == null && !hasError) {
++//              continue;
++//            }
++//
++//            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
++//                && hasError == false) {
++//              storeChunk.setFromRemote(false);
++//            } else {
++//              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
++//              storeChunk.setFromRemote(true);
++//            }
++//          } else {
++//            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
++//            storeChunk.setFromRemote(true);
++//          }
++//
++//          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
++//          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
++//          storeChunk.setEbId(f.getName());
++//          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
++//          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
++//          runnerList.add(fetcher);
++//          i++;
++//        }
++//      }
++//      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
++//      return runnerList;
++//    } else {
++//      return Lists.newArrayList();
++//    }
++//  }
++//
++//  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
++//    // Parse the URI
++//    LOG.info("getLocalStoredFileChunk starts");
++//    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
++//    final List<String> types = params.get("type");
++//    final List<String> qids = params.get("qid");
++//    final List<String> taskIdList = params.get("ta");
++//    final List<String> stageIds = params.get("sid");
++//    final List<String> partIds = params.get("p");
++//    final List<String> offsetList = params.get("offset");
++//    final List<String> lengthList = params.get("length");
++//
++//    if (types == null || stageIds == null || qids == null || partIds == null) {
++//      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
++//      return null;
++//    }
++//
++//    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
++//      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
++//      return null;
++//    }
++//
++//    String queryId = qids.get(0);
++//    String shuffleType = types.get(0);
++//    String sid = stageIds.get(0);
++//    String partId = partIds.get(0);
++//
++//    if (shuffleType.equals("r") && taskIdList == null) {
++//      LOG.error("Invalid URI - For range shuffle, taskId is required");
++//      return null;
++//    }
++//    List<String> taskIds = splitMaps(taskIdList);
++//
++//    FileChunk chunk = null;
++//    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
++//    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
++//
++//    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
++//	+ ", taskIds=" + taskIdList);
++//
++//    // The working directory of Tajo worker for each query, including stage
++//    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
++//
++//    // If the stage requires a range shuffle
++//    if (shuffleType.equals("r")) {
++//      String ta = taskIds.get(0);
++//      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
++//        LOG.warn("Range shuffle - file not exist");
++//        return null;
++//      }
++//      Path path = executionBlockContext.getLocalFS().makeQualified(
++//	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
++//      String startKey = params.get("start").get(0);
++//      String endKey = params.get("end").get(0);
++//      boolean last = params.get("final") != null;
++//
++//      try {
++//        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
++//            } catch (Throwable t) {
++//        LOG.error("getFileChunks() throws exception");
++//        return null;
++//      }
++//
++//      // If the stage requires a hash shuffle or a scattered hash shuffle
++//    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
++//      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
++//      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
++//      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
++//        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
++//        return null;
++//      }
++//      Path path = executionBlockContext.getLocalFS().makeQualified(
++//        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
++//      File file = new File(path.toUri());
++//      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
++//      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
++//
++//      if (startPos >= file.length()) {
++//        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
++//        return null;
++//      }
++//      chunk = new FileChunk(file, startPos, readLen);
++//
++//    } else {
++//      LOG.error("Unknown shuffle type");
++//      return null;
++//    }
++//
++//    return chunk;
++//  }
++//
++//  private List<String> splitMaps(List<String> mapq) {
++//    if (null == mapq) {
++//      return null;
++//    }
++//    final List<String> ret = new ArrayList<String>();
++//    for (String s : mapq) {
++//      Collections.addAll(ret, s.split(","));
++//    }
++//    return ret;
++//  }
++//
++//  public static Path getTaskAttemptDir(TaskAttemptId quid) {
++//    Path workDir =
++//        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
++//            String.valueOf(quid.getTaskId().getId()),
++//            String.valueOf(quid.getId()));
++//    return workDir;
++//  }
++//=======
+   TajoWorkerProtocol.TaskStatusProto getReport();
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 5a62487,d020639..5d7a53a
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@@ -381,20 -367,9 +367,20 @@@ public class TaskAttemptContext 
      }
      return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
    }
 -  
 +
 +  public String getUniqueKeyFromFragments() {
 +    StringBuilder sb = new StringBuilder();
 +    for (List<FragmentProto> fragments : fragmentMap.values()) {
 +      for (FragmentProto f : fragments) {
 +        FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
 +        sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
 +      }
 +    }
 +    return sb.toString();
 +  }
 +
    public int hashCode() {
-     return Objects.hashCode(queryId);
+     return Objects.hashCode(taskId);
    }
    
    public boolean equals(Object obj) {