You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/16 00:42:35 UTC
[13/13] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f674fa8f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f674fa8f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f674fa8f
Branch: refs/heads/index_support
Commit: f674fa8f08622ff752ff56bc6b11a93a520e266c
Parents: 8ec099c dc49049
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Jun 16 07:42:08 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Jun 16 07:42:08 2015 +0900
----------------------------------------------------------------------
CHANGES | 9 +
.../org/apache/tajo/jdbc/TajoResultSetBase.java | 306 ++--
.../org/apache/tajo/storage/RowStoreUtil.java | 93 +-
.../java/org/apache/tajo/datum/DateDatum.java | 34 +-
.../main/java/org/apache/tajo/datum/Datum.java | 7 +
.../org/apache/tajo/datum/DatumFactory.java | 4 +-
.../java/org/apache/tajo/datum/Float4Datum.java | 6 +-
.../java/org/apache/tajo/datum/Float8Datum.java | 6 +-
.../java/org/apache/tajo/datum/Int2Datum.java | 6 +-
.../java/org/apache/tajo/datum/Int4Datum.java | 6 +-
.../java/org/apache/tajo/datum/Int8Datum.java | 6 +-
.../org/apache/tajo/datum/IntervalDatum.java | 6 +-
.../java/org/apache/tajo/datum/TimeDatum.java | 22 +-
.../org/apache/tajo/datum/TimestampDatum.java | 45 +-
.../org/apache/tajo/storage/EmptyTuple.java | 141 +-
.../java/org/apache/tajo/storage/NullTuple.java | 43 +-
.../java/org/apache/tajo/storage/Tuple.java | 79 +-
.../java/org/apache/tajo/storage/VTuple.java | 218 +--
.../apache/tajo/util/datetime/DateTimeUtil.java | 12 +-
.../apache/tajo/datum/TestTimestampDatum.java | 2 +-
.../apache/tajo/util/TestDateTimeFormat.java | 2 +-
.../tajo/engine/function/builtin/AvgDouble.java | 10 +-
.../tajo/engine/function/builtin/AvgFloat.java | 5 +-
.../tajo/engine/function/builtin/AvgInt.java | 5 +-
.../tajo/engine/function/builtin/AvgLong.java | 10 +-
.../tajo/engine/function/builtin/Coalesce.java | 5 +-
.../tajo/engine/function/builtin/CountRows.java | 2 +-
.../engine/function/builtin/CountValue.java | 2 +-
.../function/builtin/CountValueDistinct.java | 13 +-
.../tajo/engine/function/builtin/Date.java | 2 +-
.../tajo/engine/function/builtin/LastValue.java | 5 +-
.../tajo/engine/function/builtin/Lead.java | 6 +-
.../tajo/engine/function/builtin/Max.java | 10 +-
.../tajo/engine/function/builtin/Min.java | 4 +-
.../tajo/engine/function/builtin/RandomInt.java | 2 +-
.../tajo/engine/function/builtin/SumDouble.java | 5 +-
.../function/builtin/SumDoubleDistinct.java | 10 +-
.../tajo/engine/function/builtin/SumFloat.java | 5 +-
.../function/builtin/SumFloatDistinct.java | 10 +-
.../engine/function/builtin/SumIntDistinct.java | 10 +-
.../tajo/engine/function/builtin/SumLong.java | 5 +-
.../function/builtin/SumLongDistinct.java | 10 +-
.../tajo/engine/function/builtin/Variance.java | 13 +-
.../tajo/engine/function/datetime/AddDays.java | 4 +-
.../engine/function/datetime/AddMonths.java | 6 +-
.../function/datetime/DatePartFromDate.java | 67 +-
.../function/datetime/DatePartFromTime.java | 16 +-
.../datetime/DatePartFromTimestamp.java | 14 +-
.../datetime/DateTimePartFromUnixTimestamp.java | 26 +-
.../function/datetime/ToCharTimestamp.java | 10 +-
.../tajo/engine/function/datetime/ToDate.java | 6 +-
.../function/datetime/ToTimestampInt.java | 5 +-
.../function/datetime/ToTimestampText.java | 7 +-
.../function/geoip/GeoIPCountryInet4.java | 5 +-
.../engine/function/geoip/GeoIPCountryText.java | 5 +-
.../function/geoip/GeoIPInCountryInet4.java | 6 +-
.../function/geoip/GeoIPInCountryText.java | 6 +-
.../function/json/JsonExtractPathText.java | 9 +-
.../tajo/engine/function/math/AbsDouble.java | 5 +-
.../tajo/engine/function/math/AbsFloat.java | 5 +-
.../tajo/engine/function/math/AbsInt.java | 5 +-
.../tajo/engine/function/math/AbsLong.java | 5 +-
.../apache/tajo/engine/function/math/Acos.java | 5 +-
.../apache/tajo/engine/function/math/Asin.java | 5 +-
.../apache/tajo/engine/function/math/Atan.java | 5 +-
.../apache/tajo/engine/function/math/Atan2.java | 6 +-
.../apache/tajo/engine/function/math/Cbrt.java | 5 +-
.../apache/tajo/engine/function/math/Ceil.java | 5 +-
.../apache/tajo/engine/function/math/Cos.java | 5 +-
.../tajo/engine/function/math/Degrees.java | 5 +-
.../apache/tajo/engine/function/math/Div.java | 12 +-
.../apache/tajo/engine/function/math/Exp.java | 5 +-
.../apache/tajo/engine/function/math/Floor.java | 5 +-
.../apache/tajo/engine/function/math/Mod.java | 12 +-
.../apache/tajo/engine/function/math/Pow.java | 6 +-
.../tajo/engine/function/math/Radians.java | 5 +-
.../apache/tajo/engine/function/math/Round.java | 5 +-
.../tajo/engine/function/math/RoundFloat8.java | 9 +-
.../apache/tajo/engine/function/math/Sign.java | 5 +-
.../apache/tajo/engine/function/math/Sin.java | 5 +-
.../apache/tajo/engine/function/math/Sqrt.java | 5 +-
.../apache/tajo/engine/function/math/Tan.java | 5 +-
.../tajo/engine/function/string/Ascii.java | 7 +-
.../tajo/engine/function/string/BTrim.java | 10 +-
.../tajo/engine/function/string/BitLength.java | 8 +-
.../tajo/engine/function/string/CharLength.java | 7 +-
.../apache/tajo/engine/function/string/Chr.java | 7 +-
.../tajo/engine/function/string/Concat.java | 9 +-
.../tajo/engine/function/string/Concat_ws.java | 20 +-
.../tajo/engine/function/string/Decode.java | 17 +-
.../tajo/engine/function/string/Digest.java | 8 +-
.../tajo/engine/function/string/Encode.java | 18 +-
.../tajo/engine/function/string/FindInSet.java | 21 +-
.../tajo/engine/function/string/InitCap.java | 7 +-
.../tajo/engine/function/string/LTrim.java | 10 +-
.../tajo/engine/function/string/Left.java | 14 +-
.../tajo/engine/function/string/Length.java | 5 +-
.../tajo/engine/function/string/Locate.java | 15 +-
.../tajo/engine/function/string/Lower.java | 7 +-
.../tajo/engine/function/string/Lpad.java | 23 +-
.../apache/tajo/engine/function/string/Md5.java | 11 +-
.../engine/function/string/OctetLength.java | 5 +-
.../tajo/engine/function/string/QuoteIdent.java | 6 +-
.../tajo/engine/function/string/RTrim.java | 10 +-
.../engine/function/string/RegexpReplace.java | 51 +-
.../tajo/engine/function/string/Repeat.java | 10 +-
.../tajo/engine/function/string/Reverse.java | 7 +-
.../tajo/engine/function/string/Right.java | 12 +-
.../tajo/engine/function/string/Rpad.java | 31 +-
.../tajo/engine/function/string/SplitPart.java | 9 +-
.../tajo/engine/function/string/StrPos.java | 15 +-
.../tajo/engine/function/string/StrPosb.java | 14 +-
.../tajo/engine/function/string/Substr.java | 37 +-
.../tajo/engine/function/string/ToBin.java | 5 +-
.../tajo/engine/function/string/ToCharLong.java | 4 +-
.../tajo/engine/function/string/ToHex.java | 5 +-
.../tajo/engine/function/string/Upper.java | 7 +-
.../tajo/engine/function/window/FirstValue.java | 6 +-
.../apache/tajo/engine/function/window/Lag.java | 8 +-
.../tajo/engine/function/window/Rank.java | 2 +-
.../engine/planner/RangePartitionAlgorithm.java | 66 +-
.../engine/planner/UniformRangePartition.java | 208 ++-
.../planner/physical/BSTIndexScanExec.java | 4 +
.../planner/physical/CommonHashJoinExec.java | 4 +-
.../planner/physical/ComparableVector.java | 10 +-
.../DistinctGroupbyFirstAggregationExec.java | 14 +-
.../DistinctGroupbyHashAggregationExec.java | 10 +-
.../DistinctGroupbySecondAggregationExec.java | 8 +-
.../DistinctGroupbySortAggregationExec.java | 2 +-
.../DistinctGroupbyThirdAggregationExec.java | 10 +-
.../planner/physical/HashAggregateExec.java | 4 +-
.../HashBasedColPartitionStoreExec.java | 4 +-
.../engine/planner/physical/HashJoinExec.java | 6 +-
.../planner/physical/HashLeftOuterJoinExec.java | 3 +-
.../planner/physical/HashPartitioner.java | 2 +-
.../physical/HashShuffleFileWriteExec.java | 2 +-
.../planner/physical/JoinTupleComparator.java | 4 +-
.../engine/planner/physical/PhysicalExec.java | 2 +-
.../physical/RangeShuffleFileWriteExec.java | 21 +-
.../planner/physical/SortAggregateExec.java | 9 +-
.../SortBasedColPartitionStoreExec.java | 5 +-
.../engine/planner/physical/WindowAggExec.java | 8 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 4 +-
.../NonForwardQueryResultSystemScanner.java | 19 +-
.../apache/tajo/master/exec/QueryExecutor.java | 2 +-
.../tajo/master/rm/TajoResourceTracker.java | 4 +-
.../tajo/util/TajoUncaughtExceptionHandler.java | 70 +
.../apache/tajo/util/history/HistoryWriter.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 83 +-
.../org/apache/tajo/worker/LegacyTaskImpl.java | 844 ++++++++++
.../apache/tajo/worker/NodeResourceManager.java | 45 +-
.../apache/tajo/worker/NodeStatusUpdater.java | 34 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 47 +-
.../tajo/worker/TajoWorkerManagerService.java | 9 +-
.../main/java/org/apache/tajo/worker/Task.java | 1502 ++++++++----------
.../apache/tajo/worker/TaskAttemptContext.java | 61 +-
.../org/apache/tajo/worker/TaskContainer.java | 85 +
.../org/apache/tajo/worker/TaskExecutor.java | 194 +++
.../java/org/apache/tajo/worker/TaskImpl.java | 837 ++++++++++
.../org/apache/tajo/worker/TaskManager.java | 180 +++
.../java/org/apache/tajo/worker/TaskRunner.java | 10 +-
.../apache/tajo/worker/TaskRunnerHistory.java | 1 +
.../apache/tajo/worker/TaskRunnerManager.java | 12 +-
.../worker/event/ExecutionBlockStartEvent.java | 35 +
.../worker/event/ExecutionBlockStopEvent.java | 37 +
.../worker/event/NodeResourceAllocateEvent.java | 2 +-
.../event/NodeResourceDeallocateEvent.java | 2 +-
.../tajo/worker/event/NodeResourceEvent.java | 35 +
.../worker/event/NodeResourceManagerEvent.java | 34 -
.../tajo/worker/event/NodeStatusEvent.java | 11 +-
.../tajo/worker/event/TaskExecutorEvent.java | 44 +
.../tajo/worker/event/TaskManagerEvent.java | 43 +
.../tajo/worker/event/TaskRunnerEvent.java | 1 +
.../tajo/worker/event/TaskRunnerStartEvent.java | 44 +-
.../tajo/worker/event/TaskRunnerStopEvent.java | 1 +
.../tajo/worker/event/TaskStartEvent.java | 44 +
.../tajo/ws/rs/resources/QueryResource.java | 37 +-
.../ws/rs/resources/QueryResultResource.java | 21 +-
.../src/main/proto/TajoWorkerProtocol.proto | 1 +
.../apache/tajo/engine/eval/ExprTestBase.java | 11 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 4 +-
.../tajo/engine/eval/TestSQLExpression.java | 7 +-
.../planner/TestUniformRangePartition.java | 216 +--
.../planner/physical/TestBNLJoinExec.java | 10 +-
.../planner/physical/TestExternalSortExec.java | 2 +-
.../physical/TestFullOuterHashJoinExec.java | 6 +-
.../physical/TestFullOuterMergeJoinExec.java | 8 +-
.../planner/physical/TestHashAntiJoinExec.java | 10 +-
.../planner/physical/TestHashJoinExec.java | 10 +-
.../planner/physical/TestHashPartitioner.java | 10 +-
.../planner/physical/TestHashSemiJoinExec.java | 10 +-
.../physical/TestLeftOuterHashJoinExec.java | 6 +-
.../planner/physical/TestMergeJoinExec.java | 10 +-
.../engine/planner/physical/TestNLJoinExec.java | 10 +-
.../planner/physical/TestPhysicalPlanner.java | 68 +-
.../physical/TestProgressExternalSortExec.java | 2 +-
.../physical/TestRightOuterHashJoinExec.java | 6 +-
.../physical/TestRightOuterMergeJoinExec.java | 8 +-
.../engine/planner/physical/TestSortExec.java | 8 +-
.../planner/physical/TestTupleSorter.java | 4 +-
.../apache/tajo/engine/util/TestTupleUtil.java | 23 +-
.../TestNonForwardQueryResultSystemScanner.java | 16 +-
.../apache/tajo/querymaster/TestKillQuery.java | 135 +-
.../org/apache/tajo/storage/TestRowFile.java | 4 +-
.../apache/tajo/worker/MockExecutionBlock.java | 42 +
.../tajo/worker/MockNodeResourceManager.java | 96 ++
.../tajo/worker/MockNodeStatusUpdater.java | 4 +-
.../apache/tajo/worker/MockTaskExecutor.java | 141 ++
.../org/apache/tajo/worker/MockTaskManager.java | 59 +
.../apache/tajo/worker/MockWorkerContext.java | 129 ++
.../org/apache/tajo/worker/TestFetcher.java | 14 +-
.../tajo/worker/TestNodeResourceManager.java | 135 +-
.../tajo/worker/TestNodeStatusUpdater.java | 54 +-
.../apache/tajo/worker/TestTaskExecutor.java | 330 ++++
.../org/apache/tajo/worker/TestTaskManager.java | 185 +++
.../tajo/ws/rs/resources/TestQueryResource.java | 2 +-
.../rs/resources/TestQueryResultResource.java | 2 +-
.../org/apache/tajo/jdbc/MetaDataTuple.java | 57 +-
.../apache/tajo/jdbc/TajoMetaDataResultSet.java | 14 +-
.../org/apache/tajo/plan/ExprAnnotator.java | 2 +-
.../org/apache/tajo/plan/expr/FieldEval.java | 2 +-
.../plan/function/PythonAggFunctionInvoke.java | 2 +-
.../function/python/PythonScriptEngine.java | 4 +-
.../plan/function/stream/CSVLineSerializer.java | 12 +-
.../stream/TextFieldSerializerDeserializer.java | 4 +-
.../tajo/storage/BaseTupleComparator.java | 4 +-
.../storage/BinarySerializerDeserializer.java | 52 +-
.../org/apache/tajo/storage/FrameTuple.java | 76 +-
.../java/org/apache/tajo/storage/LazyTuple.java | 57 +-
.../org/apache/tajo/storage/RowStoreUtil.java | 42 +-
.../tajo/storage/SerializerDeserializer.java | 7 +-
.../apache/tajo/storage/TableStatistics.java | 31 +-
.../storage/TextSerializerDeserializer.java | 67 +-
.../org/apache/tajo/storage/TupleRange.java | 2 -
.../apache/tajo/tuple/offheap/HeapTuple.java | 51 +-
.../apache/tajo/tuple/offheap/UnSafeTuple.java | 55 +-
.../org/apache/tajo/storage/TestFrameTuple.java | 14 +-
.../org/apache/tajo/storage/TestLazyTuple.java | 26 +-
.../tajo/storage/TestTupleComparator.java | 7 +-
.../org/apache/tajo/storage/TestVTuple.java | 23 +-
.../tajo/tuple/offheap/TestOffHeapRowBlock.java | 26 +-
.../storage/hbase/AbstractHBaseAppender.java | 15 +-
.../HBaseBinarySerializerDeserializer.java | 35 +
.../tajo/storage/hbase/HBasePutAppender.java | 6 +-
.../tajo/storage/hbase/HBaseTablespace.java | 2 +-
.../hbase/HBaseTextSerializerDeserializer.java | 9 +
.../java/org/apache/tajo/storage/CSVFile.java | 9 +-
.../storage/FieldSerializerDeserializer.java | 7 +-
.../java/org/apache/tajo/storage/RawFile.java | 4 +-
.../java/org/apache/tajo/storage/RowFile.java | 28 +-
.../apache/tajo/storage/avro/AvroAppender.java | 5 +-
.../tajo/storage/json/JsonLineSerializer.java | 2 +-
.../tajo/storage/parquet/ParquetAppender.java | 2 +-
.../storage/parquet/TajoRecordConverter.java | 2 +-
.../tajo/storage/parquet/TajoWriteSupport.java | 26 +-
.../org/apache/tajo/storage/rcfile/RCFile.java | 22 +-
.../sequencefile/SequenceFileAppender.java | 23 +-
.../sequencefile/SequenceFileScanner.java | 3 +-
.../tajo/storage/text/CSVLineDeserializer.java | 11 +-
.../tajo/storage/text/CSVLineSerializer.java | 5 +-
.../text/TextFieldSerializerDeserializer.java | 38 +-
.../tajo/storage/TestDelimitedTextFile.java | 4 +-
.../tajo/storage/TestFileStorageManager.java | 5 +-
.../apache/tajo/storage/TestFileSystems.java | 9 +-
.../apache/tajo/storage/TestMergeScanner.java | 20 +-
.../org/apache/tajo/storage/TestStorages.java | 57 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 92 +-
.../index/TestSingleCSVFileBSTIndex.java | 24 +-
.../apache/tajo/storage/json/TestJsonSerDe.java | 3 +-
.../tajo/storage/parquet/TestReadWrite.java | 2 +-
270 files changed, 6628 insertions(+), 3121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/CHANGES
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 712fc6d,bc6975a..4ed1313
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@@ -233,15 -114,13 +233,19 @@@ public class BSTIndexScanExec extends P
}
} else {
while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
++ LOG.info("while: " + tuple);
if (qual.eval(tuple).isTrue()) {
projector.eval(tuple, outTuple);
++ LOG.info("return: " + outTuple);
return outTuple;
} else {
long offset = reader.next();
- if (offset == -1) return null;
++ LOG.info("offset: " + offset);
+ if (offset == -1) {
+ return null;
+ }
else fileScanner.seek(offset);
++ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 7cb1716,bc0d212..98c5de6
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@@ -23,14 -23,9 +23,13 @@@ import com.google.protobuf.ByteString
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
- import org.apache.tajo.SessionVars;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index ea04a48,c849940..d2afb83
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -18,853 -18,35 +18,663 @@@
package org.apache.tajo.worker;
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
- import io.netty.handler.codec.http.QueryStringDecoder;
- import org.apache.commons.lang.exception.ExceptionUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.tajo.TajoProtos;
- import org.apache.tajo.TajoProtos.TaskAttemptState;
- import org.apache.tajo.TaskAttemptId;
- import org.apache.tajo.catalog.Schema;
- import org.apache.tajo.catalog.TableDesc;
- import org.apache.tajo.catalog.TableMeta;
- import org.apache.tajo.catalog.proto.CatalogProtos;
- import org.apache.tajo.catalog.statistics.TableStats;
- import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.engine.planner.physical.PhysicalExec;
- import org.apache.tajo.engine.query.QueryContext;
- import org.apache.tajo.engine.query.TaskRequest;
- import org.apache.tajo.ipc.QueryMasterProtocol;
- import org.apache.tajo.ipc.TajoWorkerProtocol.*;
- import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
- import org.apache.tajo.master.cluster.WorkerConnectionInfo;
- import org.apache.tajo.plan.function.python.TajoScriptEngine;
- import org.apache.tajo.plan.logical.*;
- import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
- import org.apache.tajo.plan.util.PlannerUtil;
- import org.apache.tajo.pullserver.TajoPullServerService;
- import org.apache.tajo.pullserver.retriever.FileChunk;
- import org.apache.tajo.rpc.NullCallback;
- import org.apache.tajo.storage.*;
- import org.apache.tajo.storage.fragment.FileFragment;
- import org.apache.tajo.util.NetUtils;
+ import org.apache.tajo.ipc.TajoWorkerProtocol;
- import java.io.File;
import java.io.IOException;
- import java.net.InetAddress;
- import java.net.URI;
- import java.util.*;
- import java.util.Map.Entry;
- import java.util.concurrent.ExecutorService;
- import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
- import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-
- public class Task {
- private static final Log LOG = LogFactory.getLog(Task.class);
- private static final float FETCHER_PROGRESS = 0.5f;
-
- private final TajoConf systemConf;
- private final QueryContext queryContext;
- private final ExecutionBlockContext executionBlockContext;
- private final TaskAttemptId taskId;
- private final String taskRunnerId;
-
- private final Path taskDir;
- private final TaskRequest request;
- private TaskAttemptContext context;
- private List<Fetcher> fetcherRunners;
- private LogicalNode plan;
- private final Map<String, TableDesc> descs = Maps.newHashMap();
- private PhysicalExec executor;
- private boolean interQuery;
- private Path inputTableBaseDir;
-
- private long startTime;
- private long finishTime;
-
- private final TableStats inputStats;
- private List<FileChunk> localChunks;
-
- // TODO - to be refactored
- private ShuffleType shuffleType = null;
- private Schema finalSchema = null;
- private TupleComparator sortComp = null;
-
- public Task(String taskRunnerId,
- Path baseDir,
- TaskAttemptId taskId,
- final ExecutionBlockContext executionBlockContext,
- final TaskRequest request) throws IOException {
- this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
- }
-
- public Task(String taskRunnerId,
- Path baseDir,
- TaskAttemptId taskId,
- TajoConf conf,
- final ExecutionBlockContext executionBlockContext,
- final TaskRequest request) throws IOException {
- this.taskRunnerId = taskRunnerId;
- this.request = request;
- this.taskId = taskId;
-
- this.systemConf = conf;
- this.queryContext = request.getQueryContext(systemConf);
- this.executionBlockContext = executionBlockContext;
- this.taskDir = StorageUtil.concatPath(baseDir,
- taskId.getTaskId().getId() + "_" + taskId.getId());
-
- this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
- request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
- this.context.setDataChannel(request.getDataChannel());
- this.context.setEnforcer(request.getEnforcer());
- this.context.setState(TaskAttemptState.TA_PENDING);
- this.inputStats = new TableStats();
- this.fetcherRunners = Lists.newArrayList();
- }
-
- public void initPlan() throws IOException {
- plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
- updateDescsForScanNodes(NodeType.SCAN);
- updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
- updateDescsForScanNodes(NodeType.INDEX_SCAN);
- LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
- if (scanNode != null) {
- for (LogicalNode node : scanNode) {
- ScanNode scan = (ScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
-
- LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
- if (partitionScanNode != null) {
- for (LogicalNode node : partitionScanNode) {
- PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
-
- interQuery = request.getProto().getInterQuery();
- if (interQuery) {
- context.setInterQuery();
- this.shuffleType = context.getDataChannel().getShuffleType();
-
- if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
- SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
- this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
- this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
- }
- } else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
- .getAppenderFilePath(taskId, queryContext.getStagingDir());
- LOG.info("Output File Path: " + outFilePath);
- context.setOutputPath(outFilePath);
- }
-
- this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
- LOG.info("==================================");
- LOG.info("* Stage " + request.getId() + " is initialized");
- LOG.info("* InterQuery: " + interQuery
- + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
- ", Fragments (num: " + request.getFragments().size() + ")" +
- ", Fetches (total:" + request.getFetches().size() + ") :");
-
- if(LOG.isDebugEnabled()) {
- for (FetchImpl f : request.getFetches()) {
- LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
- }
- }
- LOG.info("* Local task dir: " + taskDir);
- if(LOG.isDebugEnabled()) {
- LOG.debug("* plan:\n");
- LOG.debug(plan.toString());
- }
- LOG.info("==================================");
- }
-
- private void updateDescsForScanNodes(NodeType nodeType) {
- assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
- LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
- if (scanNodes != null) {
- for (LogicalNode node : scanNodes) {
- ScanNode scanNode = (ScanNode) node;
- descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
- }
- }
- }
-
- private void startScriptExecutors() throws IOException {
- for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
- executor.start(systemConf);
- }
- }
-
- private void stopScriptExecutors() {
- for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
- executor.shutdown();
- }
- }
-
- public void init() throws IOException {
- initPlan();
- startScriptExecutors();
-
- if (context.getState() == TaskAttemptState.TA_PENDING) {
- // initialize a task temporal dir
- FileSystem localFS = executionBlockContext.getLocalFS();
- localFS.mkdirs(taskDir);
-
- if (request.getFetches().size() > 0) {
- inputTableBaseDir = localFS.makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
- getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
- localFS.mkdirs(inputTableBaseDir);
- Path tableDir;
- for (String inputTable : context.getInputTables()) {
- tableDir = new Path(inputTableBaseDir, inputTable);
- if (!localFS.exists(tableDir)) {
- LOG.info("the directory is created " + tableDir.toUri());
- localFS.mkdirs(tableDir);
- }
- }
- }
- // for localizing the intermediate data
- fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
- }
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public TaskAttemptId getId() {
- return context.getTaskId();
- }
-
- public TaskAttemptState getStatus() {
- return context.getState();
- }
-
- public String toString() {
- return "queryId: " + this.getId() + " status: " + this.getStatus();
- }
-
- public void setState(TaskAttemptState status) {
- context.setState(status);
- }
-
- public TaskAttemptContext getContext() {
- return context;
- }
-
- public boolean hasFetchPhase() {
- return fetcherRunners.size() > 0;
- }
-
- public List<Fetcher> getFetchers() {
- return new ArrayList<Fetcher>(fetcherRunners);
- }
-
- public void fetch() {
- ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
- for (Fetcher f : fetcherRunners) {
- executorService.submit(new FetchRunner(context, f));
- }
- }
-
- public void kill() {
- stopScriptExecutors();
- context.setState(TaskAttemptState.TA_KILLED);
- context.stop();
- }
-
- public void abort() {
- stopScriptExecutors();
- context.stop();
- }
-
- public void cleanUp() {
- // remove itself from worker
- if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
- synchronized (executionBlockContext.getTasks()) {
- executionBlockContext.getTasks().remove(this.getId());
- }
- } else {
- LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
- }
- }
-
- public TaskStatusProto getReport() {
- TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
- builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
- builder.setId(context.getTaskId().getProto())
- .setProgress(context.getProgress())
- .setState(context.getState());
-
- builder.setInputStats(reloadInputStats());
-
- if (context.getResultStats() != null) {
- builder.setResultStats(context.getResultStats().getProto());
- }
- return builder.build();
- }
-
- public boolean isRunning(){
- return context.getState() == TaskAttemptState.TA_RUNNING;
- }
-
- public boolean isProgressChanged() {
- return context.isProgressChanged();
- }
-
- public void updateProgress() {
- if(context != null && context.isStopped()){
- return;
- }
-
- if (executor != null && context.getProgress() < 1.0f) {
- context.setExecutorProgress(executor.getProgress());
- }
- }
-
- private CatalogProtos.TableStatsProto reloadInputStats() {
- synchronized(inputStats) {
- if (this.executor == null) {
- return inputStats.getProto();
- }
-
- TableStats executorInputStats = this.executor.getInputStats();
-
- if (executorInputStats != null) {
- inputStats.setValues(executorInputStats);
- }
- return inputStats.getProto();
- }
- }
-
- private TaskCompletionReport getTaskCompletionReport() {
- TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
- builder.setId(context.getTaskId().getProto());
-
- builder.setInputStats(reloadInputStats());
-
- if (context.hasResultStats()) {
- builder.setResultStats(context.getResultStats().getProto());
- } else {
- builder.setResultStats(new TableStats().getProto());
- }
-
- Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
- if (it.hasNext()) {
- do {
- Entry<Integer, String> entry = it.next();
- ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
- part.setPartId(entry.getKey());
-
- // Set output volume
- if (context.getPartitionOutputVolume() != null) {
- for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
- if (entry.getKey().equals(e.getKey())) {
- part.setVolume(e.getValue().longValue());
- break;
- }
- }
- }
-
- builder.addShuffleFileOutputs(part.build());
- } while (it.hasNext());
- }
-
- return builder.build();
- }
-
- private void waitForFetch() throws InterruptedException, IOException {
- context.getFetchLatch().await();
- LOG.info(context.getTaskId() + " All fetches are done!");
- Collection<String> inputs = Lists.newArrayList(context.getInputTables());
-
- // Get all broadcasted tables
- Set<String> broadcastTableNames = new HashSet<String>();
- List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
- if (broadcasts != null) {
- for (EnforceProperty eachBroadcast : broadcasts) {
- broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
- }
- }
-
- // localize the fetched data and skip the broadcast table
- for (String inputTable: inputs) {
- if (broadcastTableNames.contains(inputTable)) {
- continue;
- }
- File tableDir = new File(context.getFetchIn(), inputTable);
- FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
- context.updateAssignedFragments(inputTable, frags);
- }
- }
-
- public void run() throws Exception {
- startTime = System.currentTimeMillis();
- Throwable error = null;
- try {
- if(!context.isStopped()) {
- context.setState(TaskAttemptState.TA_RUNNING);
- if (context.hasFetchPhase()) {
- // If the fetch is still in progress, the query unit must wait for
- // complete.
- waitForFetch();
- context.setFetcherProgress(FETCHER_PROGRESS);
- context.setProgressChanged(true);
- updateProgress();
- }
-
- this.executor = executionBlockContext.getTQueryEngine().
- createPlan(context, plan);
- this.executor.init();
-
- while(!context.isStopped() && executor.next() != null) {
- }
- }
- } catch (Throwable e) {
- error = e ;
- LOG.error(e.getMessage(), e);
- stopScriptExecutors();
- context.stop();
- } finally {
- if (executor != null) {
- try {
- executor.close();
- reloadInputStats();
- } catch (IOException e) {
- LOG.error(e, e);
- }
- this.executor = null;
- }
-
- executionBlockContext.completedTasksNum.incrementAndGet();
- context.getHashShuffleAppenderManager().finalizeTask(taskId);
-
- QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
- if (context.isStopped()) {
- context.setExecutorProgress(0.0f);
-
- if (context.getState() == TaskAttemptState.TA_KILLED) {
- queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
- executionBlockContext.killedTasksNum.incrementAndGet();
- } else {
- context.setState(TaskAttemptState.TA_FAILED);
- TaskFatalErrorReport.Builder errorBuilder =
- TaskFatalErrorReport.newBuilder()
- .setId(getId().getProto());
- if (error != null) {
- if (error.getMessage() == null) {
- errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
- } else {
- errorBuilder.setErrorMessage(error.getMessage());
- }
- errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
- }
-
- queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
- executionBlockContext.failedTasksNum.incrementAndGet();
- }
- } else {
- // if successful
- context.setProgress(1.0f);
- context.setState(TaskAttemptState.TA_SUCCEEDED);
- executionBlockContext.succeededTasksNum.incrementAndGet();
-
- TaskCompletionReport report = getTaskCompletionReport();
- queryMasterStub.done(null, report, NullCallback.get());
- }
- finishTime = System.currentTimeMillis();
- LOG.info(context.getTaskId() + " completed. " +
- "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
- ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
- + ", killed: " + executionBlockContext.killedTasksNum.intValue()
- + ", failed: " + executionBlockContext.failedTasksNum.intValue());
- cleanupTask();
- }
- }
-
- public void cleanupTask() {
- TaskHistory taskHistory = createTaskHistory();
- executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
- executionBlockContext.getTasks().remove(getId());
-
- fetcherRunners.clear();
- fetcherRunners = null;
- try {
- if(executor != null) {
- executor.close();
- executor = null;
- }
- } catch (IOException e) {
- LOG.fatal(e.getMessage(), e);
- }
-
- executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
- stopScriptExecutors();
- }
-
- public TaskHistory createTaskHistory() {
- TaskHistory taskHistory = null;
- try {
- taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
- startTime, finishTime, reloadInputStats());
-
- if (context.getOutputPath() != null) {
- taskHistory.setOutputPath(context.getOutputPath().toString());
- }
-
- if (context.getWorkDir() != null) {
- taskHistory.setWorkingPath(context.getWorkDir().toString());
- }
-
- if (context.getResultStats() != null) {
- taskHistory.setOutputStats(context.getResultStats().getProto());
- }
-
- if (hasFetchPhase()) {
- taskHistory.setTotalFetchCount(fetcherRunners.size());
- int i = 0;
- FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
- for (Fetcher fetcher : fetcherRunners) {
- // TODO store the fetcher histories
- if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
- builder.setStartTime(fetcher.getStartTime());
- builder.setFinishTime(fetcher.getFinishTime());
- builder.setFileLength(fetcher.getFileLen());
- builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
- builder.setState(fetcher.getState());
-
- taskHistory.addFetcherHistory(builder.build());
- }
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
- }
- taskHistory.setFinishedFetchCount(i);
- }
- } catch (Exception e) {
- LOG.warn(e.getMessage(), e);
- }
-
- return taskHistory;
- }
-
- public int hashCode() {
- return context.hashCode();
- }
-
- public boolean equals(Object obj) {
- if (obj instanceof Task) {
- Task other = (Task) obj;
- return this.context.equals(other.context);
- }
- return false;
- }
-
- private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
- throws IOException {
- Configuration c = new Configuration(systemConf);
- c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
- FileSystem fs = FileSystem.get(c);
- Path tablePath = new Path(file.getAbsolutePath());
-
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus f : fileLists) {
- if (f.getLen() == 0) {
- continue;
- }
- tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
- listTablets.add(tablet);
- }
-
- // Special treatment for locally pseudo fetched chunks
- synchronized (localChunks) {
- for (FileChunk chunk : localChunks) {
- if (name.equals(chunk.getEbId())) {
- tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
- listTablets.add(tablet);
- LOG.info("One local chunk is added to listTablets");
- }
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- private class FetchRunner implements Runnable {
- private final TaskAttemptContext ctx;
- private final Fetcher fetcher;
- private int maxRetryNum;
-
- public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
- this.ctx = ctx;
- this.fetcher = fetcher;
- this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
- }
-
- @Override
- public void run() {
- int retryNum = 0;
- int retryWaitTime = 1000; //sec
-
- try { // for releasing fetch latch
- while(!context.isStopped() && retryNum < maxRetryNum) {
- if (retryNum > 0) {
- try {
- Thread.sleep(retryWaitTime);
- retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
- }
- try {
- FileChunk fetched = fetcher.get();
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
- && fetched.getFile() != null) {
- if (fetched.fromRemote() == false) {
- localChunks.add(fetched);
- LOG.info("Add a new FileChunk to local chunk list");
- }
- break;
- }
- } catch (Throwable e) {
- LOG.error("Fetch failed: " + fetcher.getURI(), e);
- }
- retryNum++;
- }
- } finally {
- if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
- fetcherFinished(ctx);
- } else {
- if (retryNum == maxRetryNum) {
- LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
- }
- stopScriptExecutors();
- context.stop(); // retry task
- ctx.getFetchLatch().countDown();
- }
- }
- }
- }
-
- @VisibleForTesting
- public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
- if (totalFetcher > 0) {
- return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
- } else {
- return 0.0f;
- }
- }
-
- private synchronized void fetcherFinished(TaskAttemptContext ctx) {
- int fetcherSize = fetcherRunners.size();
- if(fetcherSize == 0) {
- return;
- }
-
- ctx.getFetchLatch().countDown();
-
- int remainFetcher = (int) ctx.getFetchLatch().getCount();
- if (remainFetcher == 0) {
- context.setFetcherProgress(FETCHER_PROGRESS);
- } else {
- context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
- context.setProgressChanged(true);
- }
- }
-
- private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
- List<FetchImpl> fetches) throws IOException {
-
- if (fetches.size() > 0) {
- Path inputDir = executionBlockContext.getLocalDirAllocator().
- getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-
- int i = 0;
- File storeDir;
- File defaultStoreFile;
- FileChunk storeChunk = null;
- List<Fetcher> runnerList = Lists.newArrayList();
-
- for (FetchImpl f : fetches) {
- storeDir = new File(inputDir.toString(), f.getName());
- if (!storeDir.exists()) {
- storeDir.mkdirs();
- }
-
- for (URI uri : f.getURIs()) {
- defaultStoreFile = new File(storeDir, "in_" + i);
- InetAddress address = InetAddress.getByName(uri.getHost());
-
- WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
- if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
- boolean hasError = false;
- try {
- LOG.info("Try to get local file chunk at local host");
- storeChunk = getLocalStoredFileChunk(uri, systemConf);
- } catch (Throwable t) {
- hasError = true;
- }
-
- // When a range request is out of range, storeChunk will be NULL. This case is normal state.
- // So, we should skip and don't need to create storeChunk.
- if (storeChunk == null && !hasError) {
- continue;
- }
-
- if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
- && hasError == false) {
- storeChunk.setFromRemote(false);
- } else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
- }
- } else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
- }
-
- // If we decide that intermediate data should be really fetched from a remote host, storeChunk
- // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
- storeChunk.setEbId(f.getName());
- Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
- LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
- runnerList.add(fetcher);
- i++;
- }
- }
- ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
- return runnerList;
- } else {
- return Lists.newArrayList();
- }
- }
-
- private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
- // Parse the URI
- LOG.info("getLocalStoredFileChunk starts");
- final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> taskIdList = params.get("ta");
- final List<String> stageIds = params.get("sid");
- final List<String> partIds = params.get("p");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- if (types == null || stageIds == null || qids == null || partIds == null) {
- LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
- return null;
- }
-
- if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
- LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
- return null;
- }
-
- String queryId = qids.get(0);
- String shuffleType = types.get(0);
- String sid = stageIds.get(0);
- String partId = partIds.get(0);
-
- if (shuffleType.equals("r") && taskIdList == null) {
- LOG.error("Invalid URI - For range shuffle, taskId is required");
- return null;
- }
- List<String> taskIds = splitMaps(taskIdList);
-
- FileChunk chunk = null;
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
-
- // The working directory of Tajo worker for each query, including stage
- String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
-
- // If the stage requires a range shuffle
- if (shuffleType.equals("r")) {
- String ta = taskIds.get(0);
- if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
- LOG.warn("Range shuffle - file not exist");
- return null;
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- try {
- chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("getFileChunks() throws exception");
- return null;
- }
-
- // If the stage requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
- String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
- if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
- LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
- return null;
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
- if (startPos >= file.length()) {
- LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
- return null;
- }
- chunk = new FileChunk(file, startPos, readLen);
-
- } else {
- LOG.error("Unknown shuffle type");
- return null;
- }
-
- return chunk;
- }
-
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- public static Path getTaskAttemptDir(TaskAttemptId quid) {
- Path workDir =
- StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
- String.valueOf(quid.getTaskId().getId()),
- String.valueOf(quid.getId()));
- return workDir;
- }
+ public interface Task {
+
+ void init() throws IOException;
+
+ void fetch();
+
+ void run() throws Exception;
+
+ void kill();
+
+ void abort();
+
+ void cleanup();
+
+ boolean hasFetchPhase();
+
+ boolean isProgressChanged();
+
+ boolean isStopped();
+
+ void updateProgress();
+
+ TaskAttemptContext getTaskContext();
+
+ ExecutionBlockContext getExecutionBlockContext();
+
++//<<<<<<< HEAD
++//
++// public TaskAttemptId getTaskId() {
++// return taskId;
++// }
++//
++// public TaskAttemptId getId() {
++// return context.getTaskId();
++// }
++//
++// public TaskAttemptState getStatus() {
++// return context.getState();
++// }
++//
++// public String toString() {
++// return "queryId: " + this.getId() + " status: " + this.getStatus();
++// }
++//
++// public void setState(TaskAttemptState status) {
++// context.setState(status);
++// }
++//
++// public TaskAttemptContext getContext() {
++// return context;
++// }
++//
++// public boolean hasFetchPhase() {
++// return fetcherRunners.size() > 0;
++// }
++//
++// public List<Fetcher> getFetchers() {
++// return new ArrayList<Fetcher>(fetcherRunners);
++// }
++//
++// public void fetch() {
++// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
++// for (Fetcher f : fetcherRunners) {
++// executorService.submit(new FetchRunner(context, f));
++// }
++// }
++//
++// public void kill() {
++// stopScriptExecutors();
++// context.setState(TaskAttemptState.TA_KILLED);
++// context.stop();
++// }
++//
++// public void abort() {
++// stopScriptExecutors();
++// context.stop();
++// }
++//
++// public void cleanUp() {
++// // remove itself from worker
++// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
++// synchronized (executionBlockContext.getTasks()) {
++// executionBlockContext.getTasks().remove(this.getId());
++// }
++// } else {
++// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
++// }
++// }
++//
++// public TaskStatusProto getReport() {
++// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
++// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
++// builder.setId(context.getTaskId().getProto())
++// .setProgress(context.getProgress())
++// .setState(context.getState());
++//
++// builder.setInputStats(reloadInputStats());
++//
++// if (context.getResultStats() != null) {
++// builder.setResultStats(context.getResultStats().getProto());
++// }
++// return builder.build();
++// }
++//
++// public boolean isRunning(){
++// return context.getState() == TaskAttemptState.TA_RUNNING;
++// }
++//
++// public boolean isProgressChanged() {
++// return context.isProgressChanged();
++// }
++//
++// public void updateProgress() {
++// if(context != null && context.isStopped()){
++// return;
++// }
++//
++// if (executor != null && context.getProgress() < 1.0f) {
++// context.setExecutorProgress(executor.getProgress());
++// }
++// }
++//
++// private CatalogProtos.TableStatsProto reloadInputStats() {
++// synchronized(inputStats) {
++// if (this.executor == null) {
++// return inputStats.getProto();
++// }
++//
++// TableStats executorInputStats = this.executor.getInputStats();
++//
++// if (executorInputStats != null) {
++// inputStats.setValues(executorInputStats);
++// }
++// return inputStats.getProto();
++// }
++// }
++//
++// private TaskCompletionReport getTaskCompletionReport() {
++// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
++// builder.setId(context.getTaskId().getProto());
++//
++// builder.setInputStats(reloadInputStats());
++//
++// if (context.hasResultStats()) {
++// builder.setResultStats(context.getResultStats().getProto());
++// } else {
++// builder.setResultStats(new TableStats().getProto());
++// }
++//
++// Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
++// if (it.hasNext()) {
++// do {
++// Entry<Integer, String> entry = it.next();
++// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
++// part.setPartId(entry.getKey());
++//
++// // Set output volume
++// if (context.getPartitionOutputVolume() != null) {
++// for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
++// if (entry.getKey().equals(e.getKey())) {
++// part.setVolume(e.getValue().longValue());
++// break;
++// }
++// }
++// }
++//
++// builder.addShuffleFileOutputs(part.build());
++// } while (it.hasNext());
++// }
++//
++// return builder.build();
++// }
++//
++// private void waitForFetch() throws InterruptedException, IOException {
++// context.getFetchLatch().await();
++// LOG.info(context.getTaskId() + " All fetches are done!");
++// Collection<String> inputs = Lists.newArrayList(context.getInputTables());
++//
++// // Get all broadcasted tables
++// Set<String> broadcastTableNames = new HashSet<String>();
++// List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
++// if (broadcasts != null) {
++// for (EnforceProperty eachBroadcast : broadcasts) {
++// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
++// }
++// }
++//
++// // localize the fetched data and skip the broadcast table
++// for (String inputTable: inputs) {
++// if (broadcastTableNames.contains(inputTable)) {
++// continue;
++// }
++// File tableDir = new File(context.getFetchIn(), inputTable);
++// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
++// context.updateAssignedFragments(inputTable, frags);
++// }
++// }
++//
++// public void run() throws Exception {
++// startTime = System.currentTimeMillis();
++// Throwable error = null;
++// try {
++// if(!context.isStopped()) {
++// context.setState(TaskAttemptState.TA_RUNNING);
++// if (context.hasFetchPhase()) {
++// // If the fetch is still in progress, the query unit must wait for
++// // complete.
++// waitForFetch();
++// context.setFetcherProgress(FETCHER_PROGRESS);
++// context.setProgressChanged(true);
++// updateProgress();
++// }
++//
++// this.executor = executionBlockContext.getTQueryEngine().
++// createPlan(context, plan);
++// this.executor.init();
++//
++// while(!context.isStopped() && executor.next() != null) {
++// }
++// }
++// } catch (Throwable e) {
++// error = e ;
++// LOG.error(e.getMessage(), e);
++// stopScriptExecutors();
++// context.stop();
++// } finally {
++// if (executor != null) {
++// try {
++// executor.close();
++// reloadInputStats();
++// } catch (IOException e) {
++// LOG.error(e, e);
++// }
++// this.executor = null;
++// }
++//
++// executionBlockContext.completedTasksNum.incrementAndGet();
++// context.getHashShuffleAppenderManager().finalizeTask(taskId);
++//
++// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
++// if (context.isStopped()) {
++// context.setExecutorProgress(0.0f);
++//
++// if (context.getState() == TaskAttemptState.TA_KILLED) {
++// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
++// executionBlockContext.killedTasksNum.incrementAndGet();
++// } else {
++// context.setState(TaskAttemptState.TA_FAILED);
++// TaskFatalErrorReport.Builder errorBuilder =
++// TaskFatalErrorReport.newBuilder()
++// .setId(getId().getProto());
++// if (error != null) {
++// if (error.getMessage() == null) {
++// errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
++// } else {
++// errorBuilder.setErrorMessage(error.getMessage());
++// }
++// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
++// }
++//
++// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
++// executionBlockContext.failedTasksNum.incrementAndGet();
++// }
++// } else {
++// // if successful
++// context.setProgress(1.0f);
++// context.setState(TaskAttemptState.TA_SUCCEEDED);
++// executionBlockContext.succeededTasksNum.incrementAndGet();
++//
++// TaskCompletionReport report = getTaskCompletionReport();
++// queryMasterStub.done(null, report, NullCallback.get());
++// }
++// finishTime = System.currentTimeMillis();
++// LOG.info(context.getTaskId() + " completed. " +
++// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
++// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
++// + ", killed: " + executionBlockContext.killedTasksNum.intValue()
++// + ", failed: " + executionBlockContext.failedTasksNum.intValue());
++// cleanupTask();
++// }
++// }
++//
++// public void cleanupTask() {
++// TaskHistory taskHistory = createTaskHistory();
++// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
++// executionBlockContext.getTasks().remove(getId());
++//
++// fetcherRunners.clear();
++// fetcherRunners = null;
++// try {
++// if(executor != null) {
++// executor.close();
++// executor = null;
++// }
++// } catch (IOException e) {
++// LOG.fatal(e.getMessage(), e);
++// }
++//
++// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
++// stopScriptExecutors();
++// }
++//
++// public TaskHistory createTaskHistory() {
++// TaskHistory taskHistory = null;
++// try {
++// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
++// startTime, finishTime, reloadInputStats());
++//
++// if (context.getOutputPath() != null) {
++// taskHistory.setOutputPath(context.getOutputPath().toString());
++// }
++//
++// if (context.getWorkDir() != null) {
++// taskHistory.setWorkingPath(context.getWorkDir().toString());
++// }
++//
++// if (context.getResultStats() != null) {
++// taskHistory.setOutputStats(context.getResultStats().getProto());
++// }
++//
++// if (hasFetchPhase()) {
++// taskHistory.setTotalFetchCount(fetcherRunners.size());
++// int i = 0;
++// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
++// for (Fetcher fetcher : fetcherRunners) {
++// // TODO store the fetcher histories
++// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
++// builder.setStartTime(fetcher.getStartTime());
++// builder.setFinishTime(fetcher.getFinishTime());
++// builder.setFileLength(fetcher.getFileLen());
++// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
++// builder.setState(fetcher.getState());
++//
++// taskHistory.addFetcherHistory(builder.build());
++// }
++// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
++// }
++// taskHistory.setFinishedFetchCount(i);
++// }
++// } catch (Exception e) {
++// LOG.warn(e.getMessage(), e);
++// }
++//
++// return taskHistory;
++// }
++//
++// public int hashCode() {
++// return context.hashCode();
++// }
++//
++// public boolean equals(Object obj) {
++// if (obj instanceof Task) {
++// Task other = (Task) obj;
++// return this.context.equals(other.context);
++// }
++// return false;
++// }
++//
++// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
++// throws IOException {
++// Configuration c = new Configuration(systemConf);
++// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
++// FileSystem fs = FileSystem.get(c);
++// Path tablePath = new Path(file.getAbsolutePath());
++//
++// List<FileFragment> listTablets = new ArrayList<FileFragment>();
++// FileFragment tablet;
++//
++// FileStatus[] fileLists = fs.listStatus(tablePath);
++// for (FileStatus f : fileLists) {
++// if (f.getLen() == 0) {
++// continue;
++// }
++// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
++// listTablets.add(tablet);
++// }
++//
++// // Special treatment for locally pseudo fetched chunks
++// synchronized (localChunks) {
++// for (FileChunk chunk : localChunks) {
++// if (name.equals(chunk.getEbId())) {
++// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
++// listTablets.add(tablet);
++// LOG.info("One local chunk is added to listTablets");
++// }
++// }
++// }
++//
++// FileFragment[] tablets = new FileFragment[listTablets.size()];
++// listTablets.toArray(tablets);
++//
++// return tablets;
++// }
++//
++// private class FetchRunner implements Runnable {
++// private final TaskAttemptContext ctx;
++// private final Fetcher fetcher;
++// private int maxRetryNum;
++//
++// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
++// this.ctx = ctx;
++// this.fetcher = fetcher;
++// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
++// }
++//
++// @Override
++// public void run() {
++// int retryNum = 0;
++// int retryWaitTime = 1000; //sec
++//
++// try { // for releasing fetch latch
++// while(!context.isStopped() && retryNum < maxRetryNum) {
++// if (retryNum > 0) {
++// try {
++// Thread.sleep(retryWaitTime);
++// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds
++// } catch (InterruptedException e) {
++// LOG.error(e);
++// }
++// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
++// }
++// try {
++// FileChunk fetched = fetcher.get();
++// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
++// && fetched.getFile() != null) {
++// if (fetched.fromRemote() == false) {
++// localChunks.add(fetched);
++// LOG.info("Add a new FileChunk to local chunk list");
++// }
++// break;
++// }
++// } catch (Throwable e) {
++// LOG.error("Fetch failed: " + fetcher.getURI(), e);
++// }
++// retryNum++;
++// }
++// } finally {
++// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
++// fetcherFinished(ctx);
++// } else {
++// if (retryNum == maxRetryNum) {
++// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
++// }
++// stopScriptExecutors();
++// context.stop(); // retry task
++// ctx.getFetchLatch().countDown();
++// }
++// }
++// }
++// }
++//
++// @VisibleForTesting
++// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
++// if (totalFetcher > 0) {
++// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
++// } else {
++// return 0.0f;
++// }
++// }
++//
++// private synchronized void fetcherFinished(TaskAttemptContext ctx) {
++// int fetcherSize = fetcherRunners.size();
++// if(fetcherSize == 0) {
++// return;
++// }
++//
++// ctx.getFetchLatch().countDown();
++//
++// int remainFetcher = (int) ctx.getFetchLatch().getCount();
++// if (remainFetcher == 0) {
++// context.setFetcherProgress(FETCHER_PROGRESS);
++// } else {
++// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
++// context.setProgressChanged(true);
++// }
++// }
++//
++// private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
++// List<FetchImpl> fetches) throws IOException {
++//
++// if (fetches.size() > 0) {
++// Path inputDir = executionBlockContext.getLocalDirAllocator().
++// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
++//
++// int i = 0;
++// File storeDir;
++// File defaultStoreFile;
++// FileChunk storeChunk = null;
++// List<Fetcher> runnerList = Lists.newArrayList();
++//
++// for (FetchImpl f : fetches) {
++// storeDir = new File(inputDir.toString(), f.getName());
++// if (!storeDir.exists()) {
++// storeDir.mkdirs();
++// }
++//
++// for (URI uri : f.getURIs()) {
++// defaultStoreFile = new File(storeDir, "in_" + i);
++// InetAddress address = InetAddress.getByName(uri.getHost());
++//
++// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
++// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
++// boolean hasError = false;
++// try {
++// LOG.info("Try to get local file chunk at local host");
++// storeChunk = getLocalStoredFileChunk(uri, systemConf);
++// } catch (Throwable t) {
++// hasError = true;
++// }
++//
++// // When a range request is out of range, storeChunk will be NULL. This case is normal state.
++// // So, we should skip and don't need to create storeChunk.
++// if (storeChunk == null && !hasError) {
++// continue;
++// }
++//
++// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
++// && hasError == false) {
++// storeChunk.setFromRemote(false);
++// } else {
++// storeChunk = new FileChunk(defaultStoreFile, 0, -1);
++// storeChunk.setFromRemote(true);
++// }
++// } else {
++// storeChunk = new FileChunk(defaultStoreFile, 0, -1);
++// storeChunk.setFromRemote(true);
++// }
++//
++// // If we decide that intermediate data should be really fetched from a remote host, storeChunk
++// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
++// storeChunk.setEbId(f.getName());
++// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
++// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
++// runnerList.add(fetcher);
++// i++;
++// }
++// }
++// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
++// return runnerList;
++// } else {
++// return Lists.newArrayList();
++// }
++// }
++//
++// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
++// // Parse the URI
++// LOG.info("getLocalStoredFileChunk starts");
++// final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
++// final List<String> types = params.get("type");
++// final List<String> qids = params.get("qid");
++// final List<String> taskIdList = params.get("ta");
++// final List<String> stageIds = params.get("sid");
++// final List<String> partIds = params.get("p");
++// final List<String> offsetList = params.get("offset");
++// final List<String> lengthList = params.get("length");
++//
++// if (types == null || stageIds == null || qids == null || partIds == null) {
++// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
++// return null;
++// }
++//
++// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
++// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
++// return null;
++// }
++//
++// String queryId = qids.get(0);
++// String shuffleType = types.get(0);
++// String sid = stageIds.get(0);
++// String partId = partIds.get(0);
++//
++// if (shuffleType.equals("r") && taskIdList == null) {
++// LOG.error("Invalid URI - For range shuffle, taskId is required");
++// return null;
++// }
++// List<String> taskIds = splitMaps(taskIdList);
++//
++// FileChunk chunk = null;
++// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
++// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
++//
++// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
++// + ", taskIds=" + taskIdList);
++//
++// // The working directory of Tajo worker for each query, including stage
++// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
++//
++// // If the stage requires a range shuffle
++// if (shuffleType.equals("r")) {
++// String ta = taskIds.get(0);
++// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
++// LOG.warn("Range shuffle - file not exist");
++// return null;
++// }
++// Path path = executionBlockContext.getLocalFS().makeQualified(
++// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
++// String startKey = params.get("start").get(0);
++// String endKey = params.get("end").get(0);
++// boolean last = params.get("final") != null;
++//
++// try {
++// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
++// } catch (Throwable t) {
++// LOG.error("getFileChunks() throws exception");
++// return null;
++// }
++//
++// // If the stage requires a hash shuffle or a scattered hash shuffle
++// } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
++// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
++// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
++// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
++// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
++// return null;
++// }
++// Path path = executionBlockContext.getLocalFS().makeQualified(
++// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
++// File file = new File(path.toUri());
++// long startPos = (offset >= 0 && length >= 0) ? offset : 0;
++// long readLen = (offset >= 0 && length >= 0) ? length : file.length();
++//
++// if (startPos >= file.length()) {
++// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
++// return null;
++// }
++// chunk = new FileChunk(file, startPos, readLen);
++//
++// } else {
++// LOG.error("Unknown shuffle type");
++// return null;
++// }
++//
++// return chunk;
++// }
++//
++// private List<String> splitMaps(List<String> mapq) {
++// if (null == mapq) {
++// return null;
++// }
++// final List<String> ret = new ArrayList<String>();
++// for (String s : mapq) {
++// Collections.addAll(ret, s.split(","));
++// }
++// return ret;
++// }
++//
++// public static Path getTaskAttemptDir(TaskAttemptId quid) {
++// Path workDir =
++// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
++// String.valueOf(quid.getTaskId().getId()),
++// String.valueOf(quid.getId()));
++// return workDir;
++// }
++//=======
+ TajoWorkerProtocol.TaskStatusProto getReport();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 5a62487,d020639..5d7a53a
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@@ -381,20 -367,9 +367,20 @@@ public class TaskAttemptContext
}
return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
}
-
+
+ public String getUniqueKeyFromFragments() {
+ StringBuilder sb = new StringBuilder();
+ for (List<FragmentProto> fragments : fragmentMap.values()) {
+ for (FragmentProto f : fragments) {
+ FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
+ sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+ }
+ }
+ return sb.toString();
+ }
+
public int hashCode() {
- return Objects.hashCode(queryId);
+ return Objects.hashCode(taskId);
}
public boolean equals(Object obj) {