You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/25 11:28:27 UTC
[1/6] tajo git commit: TAJO-1259: Change tsql history behavior.
(Jaewoong Jung via hyunsik)
Repository: tajo
Updated Branches:
refs/heads/index_support 3b5c28711 -> 7a38895d4
TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)
Closes #316
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c39ed5dc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c39ed5dc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c39ed5dc
Branch: refs/heads/index_support
Commit: c39ed5dc9e5b6ec80a87baf08d79fbb640fb886f
Parents: 3c833e2
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 14:40:45 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 14:40:45 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
.../main/java/org/apache/tajo/cli/tsql/TajoCli.java | 14 +++++++++-----
.../org/apache/tajo/cli/tsql/TajoFileHistory.java | 8 +++-----
3 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 29b0c0b..1bc8b9e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)
+
TAJO-1261: Separate query and ddl execution codes from GlobalEngine.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 6c5006e..fe9a005 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -338,6 +338,8 @@ public class TajoCli {
String historyPath = HOME_DIR + File.separator + HISTORY_FILE;
if ((new File(HOME_DIR)).exists()) {
history = new TajoFileHistory(new File(historyPath));
+ history.setAutoTrim(false);
+ history.setIgnoreDuplicates(false);
reader.setHistory(history);
} else {
System.err.println(ERROR_PREFIX + "home directory : '" + HOME_DIR +"' does not exist.");
@@ -391,6 +393,7 @@ public class TajoCli {
String line;
String currentPrompt = context.getCurrentDatabase();
int exitCode;
+ ParsingState latestState = SimpleParser.START_STATE;
sout.write("Try \\? for help.\n");
@@ -407,14 +410,15 @@ public class TajoCli {
} else {
List<ParsedResult> parsedResults = parser.parseLines(line);
- if (parsedResults.size() > 0) {
- for (ParsedResult parsed : parsedResults) {
- history.addStatement(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
- }
+ if (latestState != ParsingState.TOK_START && parsedResults.size() > 0) {
+ // Add multi-line statements to history in addition to individual lines.
+ ParsedResult parsed = parsedResults.get(0);
+ history.add(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
}
exitCode = executeParsedResults(parsedResults);
- currentPrompt = updatePrompt(parser.getState());
+ latestState = parser.getState();
+ currentPrompt = updatePrompt(latestState);
// if at least one failed
if (exitCode != 0) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
index ec0275c..9b1a5b8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
@@ -29,11 +29,9 @@ public class TajoFileHistory extends FileHistory {
super(file);
}
+ @Override
public void add(CharSequence item) {
- // skip add
- }
-
- public void addStatement(String item) {
- internalAdd(item);
+ // TODO: Filter out the quit command. Users wouldn't want it in the history.
+ super.add(item);
}
}
[2/6] tajo git commit: TAJO-1249: Tajo should check if a file format
given in DDL is supported. (DaeMyung Kang via hyunsik)
Posted by ji...@apache.org.
TAJO-1249: Tajo should check if a file format given in DDL is supported. (DaeMyung Kang via hyunsik)
Closes #313
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/09cad22e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/09cad22e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/09cad22e
Branch: refs/heads/index_support
Commit: 09cad22ee2f7c30a0f28ebe50e00612777e69c86
Parents: c39ed5d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 15:07:44 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 15:07:44 2014 +0900
----------------------------------------------------------------------
CHANGES | 7 +++++--
.../apache/tajo/engine/eval/TestPredicates.java | 7 -------
.../tajo/engine/planner/TestQueryValidation.java | 6 ++++++
.../TestQueryValidation/invalid_store_format.sql | 1 +
.../plan/verifier/PreLogicalPlanVerifier.java | 18 ++++++++++++++----
5 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1bc8b9e..23d34d4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -120,11 +120,14 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1249: Tajo should check if a file format given in DDL is supported.
+ (DaeMyung Kang via hyunsik)
+
TAJO-1250: RawFileAppender occasionally causes BufferOverflowException.
(jinho)
- TAJO-1259: A title in catalog configuration document is different from others.
- (Jongyoung Park via hyunsik)
+ TAJO-1259: A title in catalog configuration document is different from
+ others. (Jongyoung Park via hyunsik)
TAJO-1232: Implicit groupby queries with LIMIT lead to wrong results.
(jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
index 79a287b..94d5e71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
@@ -403,11 +403,4 @@ public class TestPredicates extends ExprTestBase {
testEval(schema, "table1", "t,f", "select not col1 is not true, not col2 is not false from table1",
new String [] {"t", "t"});
}
-
- @Test
- public void testCreateTableWithUnsupportedStoreType() throws IOException {
- testSimpleEval("create table table1 (name text, age int) using RAW;",
- new String[] {"Wrong query statement or query plan: create table table1 (name text, age int) using RAW"},
- false);
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
index 71f3f8d..b6827a2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
@@ -50,4 +50,10 @@ public class TestQueryValidation extends QueryTestCaseBase {
// See TAJO-1098
assertInvalidSQL("invalid_casewhen_1.sql");
}
+
+ @Test
+ public void testUnsupportedStoreType() throws PlanningException, IOException {
+ // See TAJO-1249
+ assertInvalidSQL("invalid_store_format.sql");
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql b/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
new file mode 100644
index 0000000..e5307d6
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
@@ -0,0 +1 @@
+create table table1 (name text, age int) using RAW;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
index e6ff0d8..c184fff 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
@@ -18,6 +18,7 @@
package org.apache.tajo.plan.verifier;
+import com.google.common.base.Preconditions;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
@@ -194,9 +195,12 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
return true;
}
- private boolean assertUnsupportedStoreType(VerificationState state, String name) {
- if (name != null && name.equals(CatalogProtos.StoreType.RAW.name())) {
- state.addVerification(String.format("Unsupported store type :%s", name));
+ private boolean assertSupportedStoreType(VerificationState state, String name) {
+ Preconditions.checkNotNull(name);
+
+ CatalogProtos.StoreType storeType = CatalogUtil.getStoreType(name);
+ if (storeType == null || storeType == CatalogProtos.StoreType.RAW) {
+ state.addVerification(String.format("Store format %s is not supported.", name));
return false;
}
return true;
@@ -248,7 +252,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
if (!expr.isIfNotExists()) {
assertRelationNoExistence(context, expr.getTableName());
}
- assertUnsupportedStoreType(context.state, expr.getStorageType());
+ if (expr.hasStorageType()) {
+ assertSupportedStoreType(context.state, expr.getStorageType());
+ }
return expr;
}
@@ -272,6 +278,10 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
assertRelationExistence(context, expr.getTableName());
}
+ if (expr.hasStorageType()) {
+ assertSupportedStoreType(context.state, expr.getStorageType());
+ }
+
if (child != null && child.getType() == OpType.Projection) {
Projection projection = (Projection) child;
[5/6] tajo git commit: TAJO-1266: Too many logs when writing a
parquet relation. (DaeMyung Kang via jihoon)
Posted by ji...@apache.org.
TAJO-1266: Too many logs when writing a parquet relation. (DaeMyung Kang via jihoon)
Closes #320
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4c713fb4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4c713fb4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4c713fb4
Branch: refs/heads/index_support
Commit: 4c713fb40464f651994926d3f56205e2e7cc2552
Parents: 533e709
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:23:53 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:23:53 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../thirdparty/parquet/ColumnChunkPageWriteStore.java | 6 +++---
.../parquet/InternalParquetRecordReader.java | 14 ++++++++------
.../parquet/InternalParquetRecordWriter.java | 4 ++--
4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 124977d..9cd2a48 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1266: Too many logs when writing a parquet relation.
+ (DaeMyung Kang via jihoon)
+
TAJO-1268: tajo-client module should not use UserGroupInformation.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
index 0dedd9b..91d4748 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
@@ -36,7 +36,7 @@ import java.io.IOException;
import java.util.*;
import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.INFO;
+import static parquet.Log.DEBUG;
class ColumnChunkPageWriteStore implements PageWriteStore {
private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
@@ -140,8 +140,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
writer.endColumn();
- if (INFO) {
- LOG.info(
+ if (DEBUG) {
+ LOG.debug(
String.format(
"written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
index 6bbd7b5..10ac6de 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -93,14 +93,14 @@ class InternalParquetRecordReader<T> {
if (current != 0) {
long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
totalTimeSpentProcessingRecords += timeAssembling;
- LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+ if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
- LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+ if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
}
- LOG.info("at row " + current + ". reading next block");
+ if (DEBUG) LOG.debug("at row " + current + ". reading next block");
long t0 = System.currentTimeMillis();
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
@@ -109,8 +109,10 @@ class InternalParquetRecordReader<T> {
long timeSpentReading = System.currentTimeMillis() - t0;
totalTimeSpentReadingBytes += timeSpentReading;
BenchmarkCounter.incrementTime(timeSpentReading);
- LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
- if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+ if (DEBUG) {
+ LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+ LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+ }
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
@@ -153,7 +155,7 @@ class InternalParquetRecordReader<T> {
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
- LOG.info("RecordReader initialized will read a total of " + total + " records.");
+ if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records.");
}
private boolean contains(GroupType group, String[] path, int index) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
index 532d9a2..da57745 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -124,7 +124,7 @@ class InternalParquetRecordWriter<T> {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = store.memSize();
if (memSize > blockSize) {
- LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
+ if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
flushStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
@@ -145,7 +145,7 @@ class InternalParquetRecordWriter<T> {
private void flushStore()
throws IOException {
- LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
+ if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
if (store.allocatedSize() > 3 * blockSize) {
LOG.warn("Too much memory used: " + store.memUsageString());
}
[6/6] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7a38895d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7a38895d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7a38895d
Branch: refs/heads/index_support
Commit: 7a38895d4b15e7f7591c3591952394dbe2a5492a
Parents: 3b5c287 4c713fb
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:28:16 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:28:16 2014 +0900
----------------------------------------------------------------------
CHANGES | 17 +-
.../org/apache/tajo/cli/tools/TajoDump.java | 8 +-
.../java/org/apache/tajo/cli/tsql/TajoCli.java | 14 +-
.../apache/tajo/cli/tsql/TajoFileHistory.java | 8 +-
.../org/apache/tajo/client/QueryClient.java | 4 +-
.../org/apache/tajo/client/QueryClientImpl.java | 4 +-
.../apache/tajo/client/SessionConnection.java | 10 +-
.../java/org/apache/tajo/auth/UserRoleInfo.java | 80 +++
.../java/org/apache/tajo/util/PlatformUtil.java | 65 +++
.../DefaultFragmentScheduleAlgorithm.java | 251 ---------
.../tajo/master/FragmentScheduleAlgorithm.java | 38 --
.../FragmentScheduleAlgorithmFactory.java | 68 ---
.../master/GreedyFragmentScheduleAlgorithm.java | 429 ---------------
.../apache/tajo/master/LazyTaskScheduler.java | 529 -------------------
.../querymaster/QueryMasterManagerService.java | 6 +-
tajo-core/src/main/resources/tajo-default.xml | 7 +-
.../org/apache/tajo/cli/tools/TestTajoDump.java | 5 +-
.../apache/tajo/engine/eval/TestPredicates.java | 7 -
.../engine/planner/TestQueryValidation.java | 6 +
.../invalid_store_format.sql | 1 +
.../plan/verifier/PreLogicalPlanVerifier.java | 18 +-
.../parquet/ColumnChunkPageWriteStore.java | 6 +-
.../parquet/InternalParquetRecordReader.java | 14 +-
.../parquet/InternalParquetRecordWriter.java | 4 +-
24 files changed, 225 insertions(+), 1374 deletions(-)
----------------------------------------------------------------------
[3/6] tajo git commit: TAJO-1268: tajo-client module should not use
UserGroupInformation.
Posted by ji...@apache.org.
TAJO-1268: tajo-client module should not use UserGroupInformation.
Closes #318
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/db549655
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/db549655
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/db549655
Branch: refs/heads/index_support
Commit: db549655034c1eb79d8f5785bc00aeecfc53a593
Parents: 09cad22
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 17:58:25 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 17:58:25 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/cli/tools/TajoDump.java | 8 +-
.../org/apache/tajo/client/QueryClient.java | 4 +-
.../org/apache/tajo/client/QueryClientImpl.java | 4 +-
.../apache/tajo/client/SessionConnection.java | 10 +--
.../java/org/apache/tajo/auth/UserRoleInfo.java | 80 ++++++++++++++++++++
.../java/org/apache/tajo/util/PlatformUtil.java | 65 ++++++++++++++++
.../org/apache/tajo/cli/tools/TestTajoDump.java | 5 +-
8 files changed, 163 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 23d34d4..2ee58a2 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1268: tajo-client module should not use UserGroupInformation.
+ (hyunsik)
+
TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)
TAJO-1261: Separate query and ddl execution codes from GlobalEngine.
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
index d05564a..750ead0 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
@@ -20,7 +20,7 @@ package org.apache.tajo.cli.tools;
import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.DDLBuilder;
import org.apache.tajo.catalog.TableDesc;
@@ -85,7 +85,7 @@ public class TajoDump {
final Pair<String, Integer> hostAndPort = getConnectionAddr(conf, cmd);
final String hostName = hostAndPort.getFirst();
final Integer port = hostAndPort.getSecond();
- final UserGroupInformation userInfo = UserGroupInformation.getCurrentUser();
+ final UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
String baseDatabaseName = null;
if (cmd.getArgList().size() > 0) {
@@ -117,7 +117,7 @@ public class TajoDump {
System.exit(0);
}
- public static void dump(TajoClient client, UserGroupInformation userInfo, String baseDatabaseName,
+ public static void dump(TajoClient client, UserRoleInfo userInfo, String baseDatabaseName,
boolean isDumpingAllDatabases, boolean includeUserName, boolean includeDate, PrintWriter out)
throws SQLException, ServiceException {
printHeader(out, userInfo, includeUserName, includeDate);
@@ -136,7 +136,7 @@ public class TajoDump {
out.flush();
}
- private static void printHeader(PrintWriter writer, UserGroupInformation userInfo, boolean includeUSerName,
+ private static void printHeader(PrintWriter writer, UserRoleInfo userInfo, boolean includeUSerName,
boolean includeDate) {
writer.write("--\n");
writer.write("-- Tajo database dump\n");
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
index 73a4d35..7c7db33 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
@@ -19,8 +19,8 @@
package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.QueryId;
+import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
@@ -49,7 +49,7 @@ public interface QueryClient extends Closeable {
@Override
public void close();
- public UserGroupInformation getUserInfo();
+ public UserRoleInfo getUserInfo();
/**
* Call to QueryMaster closing query resources
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index dc35968..f923965 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -21,11 +21,11 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
@@ -89,7 +89,7 @@ public class QueryClientImpl implements QueryClient {
}
@Override
- public UserGroupInformation getUserInfo() {
+ public UserRoleInfo getUserInfo() {
return connection.getUserInfo();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index db2bd2a..c849f2d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,11 +21,12 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -33,7 +34,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -67,7 +67,7 @@ public class SessionConnection implements Closeable {
private final String baseDatabase;
- private final UserGroupInformation userInfo;
+ private final UserRoleInfo userInfo;
volatile TajoIdProtos.SessionIdProto sessionId;
@@ -109,7 +109,7 @@ public class SessionConnection implements Closeable {
int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
// Don't share connection pool per client
connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
- userInfo = UserGroupInformation.getCurrentUser();
+ userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
}
@@ -167,7 +167,7 @@ public class SessionConnection implements Closeable {
return conf;
}
- public UserGroupInformation getUserInfo() {
+ public UserRoleInfo getUserInfo() {
return userInfo;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java b/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
new file mode 100644
index 0000000..54c438c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.auth;
+
+import org.apache.tajo.util.PlatformUtil;
+
+import java.lang.reflect.Method;
+
+public class UserRoleInfo {
+ private final String username;
+
+ public UserRoleInfo(String username) {
+ this.username = username;
+ }
+
+ public String getUserName() {
+ return username;
+ }
+
+ @Override
+ public String toString() {
+ return "user=" + username;
+ }
+
+ public synchronized static UserRoleInfo getCurrentUser() {
+ Class<?> c;
+ Object o = null;
+ Method method = null;
+ String userName;
+
+ PlatformUtil.OsType osType = PlatformUtil.getOsType();
+
+ try {
+ switch (osType) {
+ case WINDOWS:
+ c = Class.forName("com.sun.security.auth.module.NTSystem");
+ o = Class.forName("com.sun.security.auth.module.NTSystem").newInstance();
+ method = c.getDeclaredMethod("getName");
+ break;
+ case LINUX_OR_UNIX:
+ case MAC:
+ c = Class.forName("com.sun.security.auth.module.UnixSystem");
+ o = Class.forName("com.sun.security.auth.module.UnixSystem").newInstance();
+ method = c.getDeclaredMethod("getUsername");
+ break;
+ case SOLARIS:
+ c = Class.forName("com.sun.security.auth.module.SolarisSystem");
+ o = Class.forName("com.sun.security.auth.module.SolarisSystem").newInstance();
+ method = c.getDeclaredMethod("getUsername");
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown Operating System: " + PlatformUtil.getOsName());
+ }
+
+ userName = (String) method.invoke(o);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ return new UserRoleInfo(userName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
new file mode 100644
index 0000000..dabcdba
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+public class PlatformUtil {
+ public static enum OsType {
+ WINDOWS,
+ MAC,
+ LINUX_OR_UNIX,
+ SOLARIS,
+ UNKNOWN
+ }
+
+ public static String getOsName() {
+ return System.getProperty("os.name").toLowerCase();
+ }
+
+ public static OsType getOsType() {
+ String osName = System.getProperty("os.name").toLowerCase();
+
+ if (PlatformUtil.isWindows(osName)) {
+ return OsType.WINDOWS;
+ } else if (isMac(osName)) {
+ return OsType.MAC;
+ } else if (isUnix(osName)) {
+ return OsType.LINUX_OR_UNIX;
+ } else if (isSolaris(osName)) {
+ return OsType.SOLARIS;
+ } else {
+ throw new RuntimeException("Unknown OS Type: " + osName);
+ }
+ }
+
+ private static boolean isWindows(String osName) {
+ return (osName.indexOf("win") >= 0);
+ }
+
+ private static boolean isMac(String osName) {
+ return (osName.indexOf("mac") >= 0);
+ }
+
+ private static boolean isUnix(String osName) {
+ return (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") > 0 );
+ }
+
+ private static boolean isSolaris(String osName) {
+ return (osName.indexOf("sunos") >= 0) || (osName.indexOf("solaris") >= 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
index 29b7d3b..d6631f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
@@ -18,9 +18,8 @@
package org.apache.tajo.cli.tools;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.cli.tools.TajoDump;
+import org.apache.tajo.auth.UserRoleInfo;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@@ -34,7 +33,7 @@ public class TestTajoDump extends QueryTestCaseBase {
executeString("CREATE TABLE \"" + getCurrentDatabase() +
"\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
- UserGroupInformation userInfo = UserGroupInformation.getCurrentUser();
+ UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(bos);
TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
[4/6] tajo git commit: TAJO-1267: Remove LazyTaskScheduler. (DaeMyung
Kang via jihoon)
Posted by ji...@apache.org.
TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533e709b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533e709b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533e709b
Branch: refs/heads/index_support
Commit: 533e709b75ab7cf8bc8a06b48870dcf2ebc8fe11
Parents: db54965
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:06:54 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:06:54 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../DefaultFragmentScheduleAlgorithm.java | 251 ---------
.../tajo/master/FragmentScheduleAlgorithm.java | 38 --
.../FragmentScheduleAlgorithmFactory.java | 68 ---
.../master/GreedyFragmentScheduleAlgorithm.java | 429 ---------------
.../apache/tajo/master/LazyTaskScheduler.java | 529 -------------------
.../querymaster/QueryMasterManagerService.java | 6 +-
tajo-core/src/main/resources/tajo-default.xml | 7 +-
8 files changed, 6 insertions(+), 1324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2ee58a2..124977d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -227,6 +227,8 @@ Release 0.9.1 - unreleased
TASKS
+ TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)
+
TAJO-1233: Merge hbase_storage branch to the master branch.
(Hyoungjun via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
deleted file mode 100644
index 406550d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-
-import java.util.*;
-import java.util.Map.Entry;
-
-/**
- * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
- * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
- * the fragments which are stored at the disk of the host specified by the arguments.
- */
-public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
- private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
- private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
- new HashMap<String, Map<Integer, FragmentsPerDisk>>();
- private Map<String, Set<FragmentPair>> rackFragmentMapping =
- new HashMap<String, Set<FragmentPair>>();
- private int fragmentNum = 0;
- private Random random = new Random(System.currentTimeMillis());
-
- public static class FragmentsPerDisk {
- private Integer diskId;
- private Set<FragmentPair> fragmentPairSet;
-
- public FragmentsPerDisk(Integer diskId) {
- this.diskId = diskId;
- this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
- }
-
- public Integer getDiskId() {
- return diskId;
- }
-
- public Set<FragmentPair> getFragmentPairSet() {
- return fragmentPairSet;
- }
-
- public void addFragmentPair(FragmentPair fragmentPair) {
- fragmentPairSet.add(fragmentPair);
- }
-
- public boolean removeFragmentPair(FragmentPair fragmentPair) {
- return fragmentPairSet.remove(fragmentPair);
- }
-
- public int size() {
- return fragmentPairSet.size();
- }
-
- public Iterator<FragmentPair> getFragmentPairIterator() {
- return fragmentPairSet.iterator();
- }
-
- public boolean isEmpty() {
- return fragmentPairSet.isEmpty();
- }
- }
-
- @Override
- public void addFragment(FragmentPair fragmentPair) {
- String[] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = null;
- if (fragmentPair.getLeftFragment() instanceof FileFragment) {
- diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
- }
- for (int i = 0; i < hosts.length; i++) {
- addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
- }
- fragmentNum++;
- }
-
- private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
- // update the fragment maps per host
- String normalizeHost = NetUtils.normalizeHost(host);
- Map<Integer, FragmentsPerDisk> diskFragmentMap;
- if (fragmentHostMapping.containsKey(normalizeHost)) {
- diskFragmentMap = fragmentHostMapping.get(normalizeHost);
- } else {
- diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
- fragmentHostMapping.put(normalizeHost, diskFragmentMap);
- }
- FragmentsPerDisk fragmentsPerDisk;
- if (diskFragmentMap.containsKey(diskId)) {
- fragmentsPerDisk = diskFragmentMap.get(diskId);
- } else {
- fragmentsPerDisk = new FragmentsPerDisk(diskId);
- diskFragmentMap.put(diskId, fragmentsPerDisk);
- }
- fragmentsPerDisk.addFragmentPair(fragmentPair);
-
- // update the fragment maps per rack
- String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
- Set<FragmentPair> fragmentPairList;
- if (rackFragmentMapping.containsKey(rack)) {
- fragmentPairList = rackFragmentMapping.get(rack);
- } else {
- fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
- rackFragmentMapping.put(rack, fragmentPairList);
- }
- fragmentPairList.add(fragmentPair);
- }
-
- @Override
- public void removeFragment(FragmentPair fragmentPair) {
- boolean removed = false;
- for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
- String normalizedHost = NetUtils.normalizeHost(eachHost);
- Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
- for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
- FragmentsPerDisk fragmentsPerDisk = entry.getValue();
- removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
- if (removed) {
- if (fragmentsPerDisk.size() == 0) {
- diskFragmentMap.remove(entry.getKey());
- }
- if (diskFragmentMap.size() == 0) {
- fragmentHostMapping.remove(normalizedHost);
- }
- break;
- }
- }
- String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
- if (rackFragmentMapping.containsKey(rack)) {
- Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
- fragmentPairs.remove(fragmentPair);
- if (fragmentPairs.size() == 0) {
- rackFragmentMapping.remove(rack);
- }
- }
- }
- if (removed) {
- fragmentNum--;
- }
- }
-
- /**
- * Randomly select a fragment among the fragments stored on the host.
- * @param host
- * @return a randomly selected fragment
- */
- @Override
- public FragmentPair getHostLocalFragment(String host) {
- String normalizedHost = NetUtils.normalizeHost(host);
- if (fragmentHostMapping.containsKey(normalizedHost)) {
- Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
- Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
- int randomIndex = random.nextInt(disks.size());
- FragmentsPerDisk fragmentsPerDisk = null;
- for (int i = 0; i < randomIndex; i++) {
- fragmentsPerDisk = diskIterator.next();
- }
-
- if (fragmentsPerDisk != null) {
- Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
- if (fragmentIterator.hasNext()) {
- return fragmentIterator.next();
- }
- }
- }
- return null;
- }
-
- /**
- * Randomly select a fragment among the fragments stored at the disk of the host.
- * @param host
- * @param diskId
- * @return a randomly selected fragment
- */
- @Override
- public FragmentPair getHostLocalFragment(String host, Integer diskId) {
- String normalizedHost = NetUtils.normalizeHost(host);
- if (fragmentHostMapping.containsKey(normalizedHost)) {
- Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
- if (fragmentsPerDiskMap.containsKey(diskId)) {
- FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
- if (!fragmentsPerDisk.isEmpty()) {
- return fragmentsPerDisk.getFragmentPairIterator().next();
- }
- }
- }
- return null;
- }
-
- /**
- * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
- * @param host
- * @return a randomly selected fragment
- */
- @Override
- public FragmentPair getRackLocalFragment(String host) {
- String rack = RackResolver.resolve(host).getNetworkLocation();
- if (rackFragmentMapping.containsKey(rack)) {
- Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
- if (!fragmentPairs.isEmpty()) {
- return fragmentPairs.iterator().next();
- }
- }
- return null;
- }
-
- /**
- * Randomly select a fragment among the total fragments.
- * @return a randomly selected fragment
- */
- @Override
- public FragmentPair getRandomFragment() {
- if (!fragmentHostMapping.isEmpty()) {
- return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
- }
- return null;
- }
-
- @Override
- public FragmentPair[] getAllFragments() {
- List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
- for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
- for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
- fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
- }
- }
- return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
- }
-
- @Override
- public int size() {
- return fragmentNum;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
deleted file mode 100644
index 10d993d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-/**
- * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
- * FragmentScheduleAlgorithm selects a fragment for the given argument.
- *
- * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
- */
-public interface FragmentScheduleAlgorithm {
- void addFragment(FragmentPair fragmentPair);
- void removeFragment(FragmentPair fragmentPair);
-
- FragmentPair getHostLocalFragment(String host);
- FragmentPair getHostLocalFragment(String host, Integer diskId);
- FragmentPair getRackLocalFragment(String host);
- FragmentPair getRandomFragment();
- FragmentPair[] getAllFragments();
-
- int size();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
deleted file mode 100644
index 820a0fb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class FragmentScheduleAlgorithmFactory {
-
- private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
- private static final Class<?>[] DEFAULT_PARAMS = {};
-
- public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
- throws IOException {
- if (CACHED_ALGORITHM_CLASS != null) {
- return CACHED_ALGORITHM_CLASS;
- } else {
- CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
- FragmentScheduleAlgorithm.class);
- }
-
- if (CACHED_ALGORITHM_CLASS == null) {
- throw new IOException("Scheduler algorithm is null");
- }
- return CACHED_ALGORITHM_CLASS;
- }
-
- public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
- T result;
- try {
- Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
- if (constructor == null) {
- constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(clazz, constructor);
- }
- result = constructor.newInstance(new Object[]{});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return result;
- }
-
- public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
- return get(getScheduleAlgorithmClass(conf));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
deleted file mode 100644
index 56cf8e5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
- * Disks of hosts have the priorities which are represented by the remaining number of fragments.
- * This algorithm selects a fragment with trying minimizing the maximum priority.
- */
-public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
- private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
- private final HostPriorityComparator hostComparator = new HostPriorityComparator();
- private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
- new HashMap<String, Map<Integer, FragmentsPerDisk>>();
- private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
- private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
- private TopologyCache topologyCache = new TopologyCache();
- private int totalFragmentNum = 0;
-
- private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
- Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
- FragmentsPerDisk fragmentsPerDisk;
- if (fragmentHostMapping.containsKey(host)) {
- fragmentsPerDiskMap = fragmentHostMapping.get(host);
- } else {
- fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
- fragmentHostMapping.put(host, fragmentsPerDiskMap);
- }
- if (fragmentsPerDiskMap.containsKey(diskId)) {
- fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
- } else {
- fragmentsPerDisk = new FragmentsPerDisk(diskId);
- fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
- }
- return fragmentsPerDisk;
- }
-
- private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
- if (priority > 0) {
- // update the priority among the total hosts
- PrioritizedHost prioritizedHost;
- if (totalHostPriority.containsKey(hostAndDisk)) {
- prioritizedHost = totalHostPriority.get(hostAndDisk);
- prioritizedHost.priority = priority;
- } else {
- prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
- totalHostPriority.put(hostAndDisk, prioritizedHost);
- }
-
- // update the priority among the hosts in a rack
- String rack = topologyCache.resolve(hostAndDisk.host);
- Set<PrioritizedHost> hostsOfRack;
- if (!hostPriorityPerRack.containsKey(rack)) {
- hostsOfRack = new HashSet<PrioritizedHost>();
- hostsOfRack.add(prioritizedHost);
- hostPriorityPerRack.put(rack, hostsOfRack);
- }
- } else {
- if (totalHostPriority.containsKey(hostAndDisk)) {
- PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
-
- String rack = topologyCache.resolve(hostAndDisk.host);
- if (hostPriorityPerRack.containsKey(rack)) {
- Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
- hostsOfRack.remove(prioritizedHost);
- if (hostsOfRack.size() == 0){
- hostPriorityPerRack.remove(rack);
- }
- }
- }
- }
- }
-
- @Override
- public void addFragment(FragmentPair fragmentPair) {
- String[] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = null;
- if (fragmentPair.getLeftFragment() instanceof FileFragment) {
- diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
- }
- for (int i = 0; i < hosts.length; i++) {
- addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
- }
- totalFragmentNum++;
- }
-
- private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
- host = topologyCache.normalize(host);
- FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
- fragmentsPerDisk.addFragmentPair(fragmentPair);
-
- int priority;
- HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
- if (totalHostPriority.containsKey(hostAndDisk)) {
- priority = totalHostPriority.get(hostAndDisk).priority;
- } else {
- priority = 0;
- }
- updateHostPriority(hostAndDisk, priority+1);
- }
-
- public int size() {
- return totalFragmentNum;
- }
-
- /**
- * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
- * priority.
- * @param host
- * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
- */
- @Override
- public FragmentPair getHostLocalFragment(String host) {
- String normalizedHost = topologyCache.normalize(host);
- if (!fragmentHostMapping.containsKey(normalizedHost)) {
- return null;
- }
-
- Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
- List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
- Collections.shuffle(disks);
- FragmentsPerDisk fragmentsPerDisk = null;
- FragmentPair fragmentPair = null;
-
- for (Integer diskId : disks) {
- fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
- if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
- fragmentPair = getBestFragment(fragmentsPerDisk);
- }
- if (fragmentPair != null) {
- return fragmentPair;
- }
- }
-
- return null;
- }
-
- /**
- * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
- * priority.
- * @param host
- * @param diskId
- * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
- */
- @Override
- public FragmentPair getHostLocalFragment(String host, Integer diskId) {
- String normalizedHost = NetUtils.normalizeHost(host);
- if (fragmentHostMapping.containsKey(normalizedHost)) {
- Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
- if (fragmentsPerDiskMap.containsKey(diskId)) {
- FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
- if (!fragmentsPerDisk.isEmpty()) {
- return getBestFragment(fragmentsPerDisk);
- }
- }
- }
- return null;
- }
-
- /**
- * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
- * of the maximal priority.
- * @param fragmentsPerDisk a fragment set
- * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
- */
- private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
- // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
- Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
- PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
- Arrays.sort(sortedHosts, hostComparator);
-
- for (PrioritizedHost nextHost : sortedHosts) {
- if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
- Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
- if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
- Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
- Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
- while (smallFragmentSetIterator.hasNext()) {
- FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
- if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
- return eachFragmentOfSmallSet;
- }
- }
- }
- }
- }
- return null;
- }
-
- /**
- * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
- * priority.
- * @param host
- * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
- */
- public FragmentPair getRackLocalFragment(String host) {
- host = topologyCache.normalize(host);
- // Select a fragment from a host that has the most fragments in the rack
- String rack = topologyCache.resolve(host);
- Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
- if (hostsOfRack != null && hostsOfRack.size() > 0) {
- PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
- Arrays.sort(sortedHosts, hostComparator);
- for (PrioritizedHost nextHost : sortedHosts) {
- if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
- List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
- Collections.shuffle(disks);
-
- for (FragmentsPerDisk fragmentsPerDisk : disks) {
- if (!fragmentsPerDisk.isEmpty()) {
- return fragmentsPerDisk.getFragmentPairIterator().next();
- }
- }
- }
- }
- }
- return null;
- }
-
- /**
- * Selects a fragment from the disk of the maximum priority.
- * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
- */
- public FragmentPair getRandomFragment() {
- // Select a fragment from a host that has the most fragments
- Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
- PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
- Arrays.sort(sortedHosts, hostComparator);
- PrioritizedHost randomHost = sortedHosts[0];
- if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
- Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
- if (fragmentsPerDiskIterator.hasNext()) {
- Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
- if (fragmentPairIterator.hasNext()) {
- return fragmentPairIterator.next();
- }
- }
- }
- return null;
- }
-
- public FragmentPair[] getAllFragments() {
- List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
- for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
- for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
- Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
- fragmentPairs.addAll(pairSet);
- }
- }
- return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
- }
-
- public void removeFragment(FragmentPair fragmentPair) {
- String [] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = null;
- if (fragmentPair.getLeftFragment() instanceof FileFragment) {
- diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
- }
- for (int i = 0; i < hosts.length; i++) {
- int diskId = diskIds == null ? -1 : diskIds[i];
- String normalizedHost = NetUtils.normalizeHost(hosts[i]);
- Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
-
- if (diskFragmentMap != null) {
- FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
- if (fragmentsPerDisk != null) {
- boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
- if (isRemoved) {
- if (fragmentsPerDisk.size() == 0) {
- diskFragmentMap.remove(diskId);
- if (diskFragmentMap.size() == 0) {
- fragmentHostMapping.remove(normalizedHost);
- }
- }
- HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
- if (totalHostPriority.containsKey(hostAndDisk)) {
- PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
- updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
- }
- }
- }
- }
- }
-
- totalFragmentNum--;
- }
-
- private static class HostAndDisk {
- private String host;
- private Integer diskId;
-
- public HostAndDisk(String host, Integer diskId) {
- this.host = host;
- this.diskId = diskId;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getDiskId() {
- return diskId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(host, diskId);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof HostAndDisk) {
- HostAndDisk other = (HostAndDisk) o;
- return this.host.equals(other.host) &&
- TUtil.checkEquals(this.diskId, other.diskId);
- }
- return false;
- }
- }
-
- public static class PrioritizedHost {
- private HostAndDisk hostAndDisk;
- private int priority;
-
- public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
- this.hostAndDisk = hostAndDisk;
- this.priority = priority;
- }
-
- public PrioritizedHost(String host, Integer diskId, int priority) {
- this.hostAndDisk = new HostAndDisk(host, diskId);
- this.priority = priority;
- }
-
- public String getHost() {
- return hostAndDisk.host;
- }
-
- public Integer getDiskId() {
- return hostAndDisk.diskId;
- }
-
- public Integer getPriority() {
- return priority;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof PrioritizedHost) {
- PrioritizedHost other = (PrioritizedHost) o;
- return this.hostAndDisk.equals(other.hostAndDisk);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return hostAndDisk.hashCode();
- }
-
- @Override
- public String toString() {
- return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
- }
- }
-
-
- public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
-
- @Override
- public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
- return prioritizedHost2.priority - prioritizedHost.priority;
- }
- }
-
-
- public static class TopologyCache {
- private Map<String, String> hostRackMap = new HashMap<String, String>();
- private Map<String, String> normalizedHostMap = new HashMap<String, String>();
-
- public String normalize(String host) {
- if (normalizedHostMap.containsKey(host)) {
- return normalizedHostMap.get(host);
- } else {
- String normalized = NetUtils.normalizeHost(host);
- normalizedHostMap.put(host, normalized);
- return normalized;
- }
- }
-
- public String resolve(String host) {
- if (hostRackMap.containsKey(host)) {
- return hostRackMap.get(host);
- } else {
- String rack = RackResolver.resolve(host).getNetworkLocation();
- hostRackMap.put(host, rack);
- return rack;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
deleted file mode 100644
index 32af17b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class LazyTaskScheduler extends AbstractTaskScheduler {
- private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
-
- private final TaskSchedulerContext context;
- private final Stage stage;
-
- private Thread schedulingThread;
- private volatile boolean stopEventHandling;
-
- BlockingQueue<TaskSchedulerEvent> eventQueue
- = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
- private TaskRequests taskRequests;
- private FragmentScheduleAlgorithm scheduledFragments;
- private ScheduledFetches scheduledFetches;
-
- private int diskLocalAssigned = 0;
- private int hostLocalAssigned = 0;
- private int rackLocalAssigned = 0;
- private int totalAssigned = 0;
-
- private int nextTaskId = 0;
- private int containerNum;
-
- public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) {
- super(LazyTaskScheduler.class.getName());
- this.context = context;
- this.stage = stage;
- }
-
- @Override
- public void init(Configuration conf) {
- taskRequests = new TaskRequests();
- try {
- scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
- LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (!context.isLeafQuery()) {
- scheduledFetches = new ScheduledFetches();
- }
-
- super.init(conf);
- }
-
- @Override
- public void start() {
- containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
- stage.getContext().getQueryMasterContext().getWorkerContext(),
- context.getEstimatedTaskNum(), 512);
-
- LOG.info("Start TaskScheduler");
- this.schedulingThread = new Thread() {
- public void run() {
-
- while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- break;
- }
-
- schedule();
- }
- LOG.info("TaskScheduler schedulingThread stopped");
- }
- };
-
- this.schedulingThread.start();
- super.start();
- }
-
- private static final TaskAttemptId NULL_ATTEMPT_ID;
- public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
- static {
- ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
- NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
- TajoWorkerProtocol.TaskRequestProto.Builder builder =
- TajoWorkerProtocol.TaskRequestProto.newBuilder();
- builder.setId(NULL_ATTEMPT_ID.getProto());
- builder.setShouldDie(true);
- builder.setOutputTable("");
- builder.setSerializedData("");
- builder.setClusteredOutput(false);
- stopTaskRunnerReq = builder.build();
- }
-
- @Override
- public void stop() {
- stopEventHandling = true;
- schedulingThread.interrupt();
-
- // Return all of request callbacks instantly.
- for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
- req.getCallback().run(stopTaskRunnerReq);
- }
-
- LOG.info("Task Scheduler stopped");
- super.stop();
- }
-
- List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
- public void schedule() {
- if (taskRequests.size() > 0) {
- if (context.isLeafQuery()) {
- LOG.debug("Try to schedule tasks with taskRequestEvents: " +
- taskRequests.size() + ", Fragment Schedule Request: " +
- scheduledFragments.size());
- taskRequests.getTaskRequests(taskRequestEvents,
- scheduledFragments.size());
- LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
- if (taskRequestEvents.size() > 0) {
- assignLeafTasks(taskRequestEvents);
- }
- taskRequestEvents.clear();
- } else {
- LOG.debug("Try to schedule tasks with taskRequestEvents: " +
- taskRequests.size() + ", Fetch Schedule Request: " +
- scheduledFetches.size());
- taskRequests.getTaskRequests(taskRequestEvents,
- scheduledFetches.size());
- LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
- if (taskRequestEvents.size() > 0) {
- assignNonLeafTasks(taskRequestEvents);
- }
- taskRequestEvents.clear();
- }
- }
- }
-
- @Override
- public void handle(TaskSchedulerEvent event) {
- int qSize = eventQueue.size();
- if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
- }
- int remCapacity = eventQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue "
- + "of DefaultTaskScheduler: " + remCapacity);
- }
-
- if (event.getType() == EventType.T_SCHEDULE) {
- if (event instanceof FragmentScheduleEvent) {
- FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
- Collection<Fragment> rightFragments = castEvent.getRightFragments();
- if (rightFragments == null || rightFragments.isEmpty()) {
- scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
- } else {
- for (Fragment eachFragment: rightFragments) {
- scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
- }
- }
- if (castEvent.getLeftFragment() instanceof FileFragment) {
- initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds());
- }
- } else if (event instanceof FetchScheduleEvent) {
- FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
- scheduledFetches.addFetch(castEvent.getFetches());
- } else if (event instanceof TaskAttemptToSchedulerEvent) {
- TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
- assignTask(castEvent.getContext(), castEvent.getTaskAttempt());
- }
- }
- }
-
- public void handleTaskRequestEvent(TaskRequestEvent event) {
- taskRequests.handle(event);
- }
-
- @Override
- public int remainingScheduledObjectNum() {
- if (context.isLeafQuery()) {
- return scheduledFragments.size();
- } else {
- return scheduledFetches.size();
- }
- }
-
- private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
-
- private void initDiskBalancer(String[] hosts, int[] diskIds) {
- for (int i = 0; i < hosts.length; i++) {
- DiskBalancer diskBalancer;
- String normalized = NetUtils.normalizeHost(hosts[i]);
- if (hostDiskBalancerMap.containsKey(normalized)) {
- diskBalancer = hostDiskBalancerMap.get(normalized);
- } else {
- diskBalancer = new DiskBalancer(normalized);
- hostDiskBalancerMap.put(normalized, diskBalancer);
- }
- diskBalancer.addDiskId(diskIds[i]);
- }
- }
-
- private static class DiskBalancer {
- private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId,
- Integer>();
- private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
- private String host;
-
- public DiskBalancer(String host){
- this.host = host;
- }
-
- public void addDiskId(Integer diskId) {
- if (!diskReferMap.containsKey(diskId)) {
- diskReferMap.put(diskId, 0);
- }
- }
-
- public Integer getDiskId(TajoContainerId containerId) {
- if (!containerDiskMap.containsKey(containerId)) {
- assignVolumeId(containerId);
- }
-
- return containerDiskMap.get(containerId);
- }
-
- public void assignVolumeId(TajoContainerId containerId){
- Map.Entry<Integer, Integer> volumeEntry = null;
-
- for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
- if(volumeEntry == null) volumeEntry = entry;
-
- if (volumeEntry.getValue() >= entry.getValue()) {
- volumeEntry = entry;
- }
- }
-
- if(volumeEntry != null){
- diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
- LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
- + diskReferMap.get(volumeEntry.getKey()));
- containerDiskMap.put(containerId, volumeEntry.getKey());
- }
- }
-
- public String getHost() {
- return host;
- }
- }
-
- private class TaskRequests implements EventHandler<TaskRequestEvent> {
- private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
- new LinkedBlockingQueue<TaskRequestEvent>();
-
- @Override
- public void handle(TaskRequestEvent event) {
- LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
- if(stopEventHandling) {
- event.getCallback().run(stopTaskRunnerReq);
- return;
- }
- int qSize = taskRequestQueue.size();
- if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
- }
- int remCapacity = taskRequestQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue "
- + "of DefaultTaskScheduler: " + remCapacity);
- }
-
- taskRequestQueue.add(event);
- }
-
- public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
- int num) {
- taskRequestQueue.drainTo(taskRequests, num);
- }
-
- public int size() {
- return taskRequestQueue.size();
- }
- }
-
- private long adjustTaskSize() {
- long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
- long fragNumPerTask = context.getTaskSize() / originTaskSize;
- if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
- return context.getTaskSize();
- } else {
- fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
- return originTaskSize * fragNumPerTask;
- }
- }
-
- private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
- Collections.shuffle(taskRequests);
- Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
- TaskRequestEvent taskRequest;
- while (it.hasNext() && scheduledFragments.size() > 0) {
- taskRequest = it.next();
- LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
- "containerId=" + taskRequest.getContainerId());
- ContainerProxy container = context.getMasterContext().getResourceAllocator().
- getContainer(taskRequest.getContainerId());
-
- if(container == null) {
- continue;
- }
-
- String host = container.getTaskHostName();
- TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID,
- host, taskRequest.getCallback());
- Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
-
- FragmentPair fragmentPair;
- List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
- boolean diskLocal = false;
- long assignedFragmentSize = 0;
- long taskSize = adjustTaskSize();
- LOG.info("Adjusted task size: " + taskSize);
-
- TajoConf conf = stage.getContext().getConf();
- // host local, disk local
- String normalized = NetUtils.normalizeHost(host);
- Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
- if (diskId != null && diskId != -1) {
- do {
- fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
- if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
- break;
- }
-
- if (assignedFragmentSize +
- StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
- break;
- } else {
- fragmentPairs.add(fragmentPair);
- assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
- if (fragmentPair.getRightFragment() != null) {
- assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
- }
- }
- scheduledFragments.removeFragment(fragmentPair);
- diskLocal = true;
- } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
- }
-
- if (assignedFragmentSize < taskSize) {
- // host local
- do {
- fragmentPair = scheduledFragments.getHostLocalFragment(host);
- if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
- break;
- }
-
- if (assignedFragmentSize +
- StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
- break;
- } else {
- fragmentPairs.add(fragmentPair);
- assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
- if (fragmentPair.getRightFragment() != null) {
- assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
- }
- }
- scheduledFragments.removeFragment(fragmentPair);
- diskLocal = false;
- } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
- }
-
- // rack local
- if (fragmentPairs.size() == 0) {
- fragmentPair = scheduledFragments.getRackLocalFragment(host);
-
- // random
- if (fragmentPair == null) {
- fragmentPair = scheduledFragments.getRandomFragment();
- } else {
- rackLocalAssigned++;
- }
-
- if (fragmentPair != null) {
- fragmentPairs.add(fragmentPair);
- scheduledFragments.removeFragment(fragmentPair);
- }
- } else {
- if (diskLocal) {
- diskLocalAssigned++;
- } else {
- hostLocalAssigned++;
- }
- }
-
- if (fragmentPairs.size() == 0) {
- throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
- }
-
- LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
-
- task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
- stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
- }
- }
-
- private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
- Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
- TaskRequestEvent taskRequest;
- while (it.hasNext()) {
- taskRequest = it.next();
- LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
- // random allocation
- if (scheduledFetches.size() > 0) {
- LOG.debug("Assigned based on * match");
- ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
- taskRequest.getContainerId());
- TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID,
- container.getTaskHostName(), taskRequest.getCallback());
- Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
- task.setFragment(scheduledFragments.getAllFragments());
- stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
- }
- }
- }
-
- private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) {
- TaskAttemptId attemptId = taskAttempt.getId();
- TaskRequest taskAssign = new TaskRequestImpl(
- attemptId,
- new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()),
- "",
- false,
- taskAttempt.getTask().getLogicalPlan().toJson(),
- context.getMasterContext().getQueryContext(),
- stage.getDataChannel(), stage.getBlock().getEnforcer());
- if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
- taskAssign.setInterQuery();
- }
-
- if (!context.isLeafQuery()) {
- Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch();
- scheduledFetches.popNextFetch();
-
- for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) {
- for (FetchImpl eachValue : fetchEntry.getValue()) {
- taskAssign.addFetch(fetchEntry.getKey(), eachValue);
- }
- }
- }
-
- context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
-
- totalAssigned++;
- attemptContext.getCallback().run(taskAssign.getProto());
-
- if (context.isLeafQuery()) {
- LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
- LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
- LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
- }
- }
-
- private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
- if (masterPlan.isRoot(block)) {
- return false;
- }
-
- ExecutionBlock parent = masterPlan.getParent(block);
- if (masterPlan.isRoot(parent) && parent.hasUnion()) {
- return false;
- }
-
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index c2e1009..9f7d3f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -30,9 +30,9 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.LazyTaskScheduler;
-import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.DefaultTaskScheduler;
import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -128,7 +128,7 @@ public class QueryMasterManagerService extends CompositeService
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
if(queryMasterTask == null || queryMasterTask.isStopped()) {
- done.run(LazyTaskScheduler.stopTaskRunnerReq);
+ done.run(DefaultTaskScheduler.stopTaskRunnerReq);
} else {
TajoContainerId cid =
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index c49e8e5..db92b02 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -42,9 +42,4 @@
<value>org.apache.tajo.master.DefaultTaskScheduler</value>
</property>
- <property>
- <name>tajo.querymaster.lazy-task-scheduler.algorithm</name>
- <value>org.apache.tajo.master.GreedyFragmentScheduleAlgorithm</value>
- </property>
-
-</configuration>
\ No newline at end of file
+</configuration>