You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/25 11:28:27 UTC

[1/6] tajo git commit: TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)

Repository: tajo
Updated Branches:
  refs/heads/index_support 3b5c28711 -> 7a38895d4


TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)

Closes #316


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c39ed5dc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c39ed5dc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c39ed5dc

Branch: refs/heads/index_support
Commit: c39ed5dc9e5b6ec80a87baf08d79fbb640fb886f
Parents: 3c833e2
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 14:40:45 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 14:40:45 2014 +0900

----------------------------------------------------------------------
 CHANGES                                               |  2 ++
 .../main/java/org/apache/tajo/cli/tsql/TajoCli.java   | 14 +++++++++-----
 .../org/apache/tajo/cli/tsql/TajoFileHistory.java     |  8 +++-----
 3 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 29b0c0b..1bc8b9e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)
+
     TAJO-1261: Separate query and ddl execution codes from GlobalEngine. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 6c5006e..fe9a005 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -338,6 +338,8 @@ public class TajoCli {
       String historyPath = HOME_DIR + File.separator + HISTORY_FILE;
       if ((new File(HOME_DIR)).exists()) {
         history = new TajoFileHistory(new File(historyPath));
+        history.setAutoTrim(false);
+        history.setIgnoreDuplicates(false);
         reader.setHistory(history);
       } else {
         System.err.println(ERROR_PREFIX + "home directory : '" + HOME_DIR +"' does not exist.");
@@ -391,6 +393,7 @@ public class TajoCli {
     String line;
     String currentPrompt = context.getCurrentDatabase();
     int exitCode;
+    ParsingState latestState = SimpleParser.START_STATE;
 
     sout.write("Try \\? for help.\n");
 
@@ -407,14 +410,15 @@ public class TajoCli {
         } else {
           List<ParsedResult> parsedResults = parser.parseLines(line);
 
-          if (parsedResults.size() > 0) {
-            for (ParsedResult parsed : parsedResults) {
-              history.addStatement(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
-            }
+          if (latestState != ParsingState.TOK_START && parsedResults.size() > 0) {
+            // Add multi-line statements to history in addition to individual lines.
+            ParsedResult parsed = parsedResults.get(0);
+            history.add(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
           }
 
           exitCode = executeParsedResults(parsedResults);
-          currentPrompt = updatePrompt(parser.getState());
+          latestState = parser.getState();
+          currentPrompt = updatePrompt(latestState);
 
           // if at least one failed
           if (exitCode != 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/c39ed5dc/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
index ec0275c..9b1a5b8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoFileHistory.java
@@ -29,11 +29,9 @@ public class TajoFileHistory extends FileHistory {
     super(file);
   }
 
+  @Override
   public void add(CharSequence item) {
-    // skip add
-  }
-
-  public void addStatement(String item) {
-    internalAdd(item);
+    // TODO: Filter out the quit command. Users wouldn't want it in the history.
+    super.add(item);
   }
 }


[2/6] tajo git commit: TAJO-1249: Tajo should check if a file format given in DDL is supported. (DaeMyung Kang via hyunsik)

Posted by ji...@apache.org.
TAJO-1249: Tajo should check if a file format given in DDL is supported. (DaeMyung Kang via hyunsik)

Closes #313


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/09cad22e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/09cad22e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/09cad22e

Branch: refs/heads/index_support
Commit: 09cad22ee2f7c30a0f28ebe50e00612777e69c86
Parents: c39ed5d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 15:07:44 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 15:07:44 2014 +0900

----------------------------------------------------------------------
 CHANGES                                           |  7 +++++--
 .../apache/tajo/engine/eval/TestPredicates.java   |  7 -------
 .../tajo/engine/planner/TestQueryValidation.java  |  6 ++++++
 .../TestQueryValidation/invalid_store_format.sql  |  1 +
 .../plan/verifier/PreLogicalPlanVerifier.java     | 18 ++++++++++++++----
 5 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1bc8b9e..23d34d4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -120,11 +120,14 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1249: Tajo should check if a file format given in DDL is supported.
+    (DaeMyung Kang via hyunsik)
+ 
     TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. 
     (jinho)
 
-    TAJO-1259: A title in catalog configuration document is different from others. 
-    (Jongyoung Park via hyunsik)
+    TAJO-1259: A title in catalog configuration document is different from 
+    others. (Jongyoung Park via hyunsik)
 
     TAJO-1232: Implicit groupby queries with LIMIT lead to wrong results. 
     (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
index 79a287b..94d5e71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestPredicates.java
@@ -403,11 +403,4 @@ public class TestPredicates extends ExprTestBase {
     testEval(schema, "table1", "t,f", "select not col1 is not true, not col2 is not false from table1",
         new String [] {"t", "t"});
   }
-
-  @Test
-  public void testCreateTableWithUnsupportedStoreType() throws IOException {
-    testSimpleEval("create table table1 (name text, age int) using RAW;",
-        new String[] {"Wrong query statement or query plan: create table table1 (name text, age int) using RAW"},
-        false);
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
index 71f3f8d..b6827a2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestQueryValidation.java
@@ -50,4 +50,10 @@ public class TestQueryValidation extends QueryTestCaseBase {
     // See TAJO-1098
     assertInvalidSQL("invalid_casewhen_1.sql");
   }
+
+  @Test
+  public void testUnsupportedStoreType() throws PlanningException, IOException {
+    // See TAJO-1249
+    assertInvalidSQL("invalid_store_format.sql");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql b/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
new file mode 100644
index 0000000..e5307d6
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestQueryValidation/invalid_store_format.sql
@@ -0,0 +1 @@
+create table table1 (name text, age int) using RAW;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/09cad22e/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
index e6ff0d8..c184fff 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.plan.verifier;
 
+import com.google.common.base.Preconditions;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
@@ -194,9 +195,12 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
     return true;
   }
 
-  private boolean assertUnsupportedStoreType(VerificationState state, String name) {
-    if (name != null && name.equals(CatalogProtos.StoreType.RAW.name())) {
-      state.addVerification(String.format("Unsupported store type :%s", name));
+  private boolean assertSupportedStoreType(VerificationState state, String name) {
+    Preconditions.checkNotNull(name);
+
+    CatalogProtos.StoreType storeType = CatalogUtil.getStoreType(name);
+    if (storeType == null || storeType == CatalogProtos.StoreType.RAW) {
+      state.addVerification(String.format("Store format %s is not supported.", name));
       return false;
     }
     return true;
@@ -248,7 +252,9 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
     if (!expr.isIfNotExists()) {
       assertRelationNoExistence(context, expr.getTableName());
     }
-    assertUnsupportedStoreType(context.state, expr.getStorageType());
+    if (expr.hasStorageType()) {
+      assertSupportedStoreType(context.state, expr.getStorageType());
+    }
     return expr;
   }
 
@@ -272,6 +278,10 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
       assertRelationExistence(context, expr.getTableName());
     }
 
+    if (expr.hasStorageType()) {
+      assertSupportedStoreType(context.state, expr.getStorageType());
+    }
+
     if (child != null && child.getType() == OpType.Projection) {
       Projection projection = (Projection) child;
 


[5/6] tajo git commit: TAJO-1266: Too many logs when writing a parquet relation. (DaeMyung Kang via jihoon)

Posted by ji...@apache.org.
TAJO-1266: Too many logs when writing a parquet relation. (DaeMyung Kang via jihoon)

Closes #320


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4c713fb4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4c713fb4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4c713fb4

Branch: refs/heads/index_support
Commit: 4c713fb40464f651994926d3f56205e2e7cc2552
Parents: 533e709
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:23:53 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:23:53 2014 +0900

----------------------------------------------------------------------
 CHANGES                                               |  3 +++
 .../thirdparty/parquet/ColumnChunkPageWriteStore.java |  6 +++---
 .../parquet/InternalParquetRecordReader.java          | 14 ++++++++------
 .../parquet/InternalParquetRecordWriter.java          |  4 ++--
 4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 124977d..9cd2a48 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1266: Too many logs when writing a parquet relation. 
+    (DaeMyung Kang via jihoon)
+
     TAJO-1268: tajo-client module should not use UserGroupInformation. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
index 0dedd9b..91d4748 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
@@ -36,7 +36,7 @@ import java.io.IOException;
 import java.util.*;
 
 import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.INFO;
+import static parquet.Log.DEBUG;
 
 class ColumnChunkPageWriteStore implements PageWriteStore {
   private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
@@ -140,8 +140,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       }
       writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
       writer.endColumn();
-      if (INFO) {
-        LOG.info(
+      if (DEBUG) {
+        LOG.debug(
             String.format(
                 "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
                 buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
index 6bbd7b5..10ac6de 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -93,14 +93,14 @@ class InternalParquetRecordReader<T> {
       if (current != 0) {
         long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
         totalTimeSpentProcessingRecords += timeAssembling;
-        LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+        if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
         long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
         long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
         long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
-        LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+        if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
       }
 
-      LOG.info("at row " + current + ". reading next block");
+      if (DEBUG) LOG.debug("at row " + current + ". reading next block");
       long t0 = System.currentTimeMillis();
       PageReadStore pages = reader.readNextRowGroup();
       if (pages == null) {
@@ -109,8 +109,10 @@ class InternalParquetRecordReader<T> {
       long timeSpentReading = System.currentTimeMillis() - t0;
       totalTimeSpentReadingBytes += timeSpentReading;
       BenchmarkCounter.incrementTime(timeSpentReading);
-      LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
-      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      if (DEBUG) {
+        LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+        LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      }
       MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
       recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
@@ -153,7 +155,7 @@ class InternalParquetRecordReader<T> {
     for (BlockMetaData block : blocks) {
       total += block.getRowCount();
     }
-    LOG.info("RecordReader initialized will read a total of " + total + " records.");
+    if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records.");
   }
 
   private boolean contains(GroupType group, String[] path, int index) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
index 532d9a2..da57745 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -124,7 +124,7 @@ class InternalParquetRecordWriter<T> {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
       long memSize = store.memSize();
       if (memSize > blockSize) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
+        if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
         flushStore();
         initStore();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
@@ -145,7 +145,7 @@ class InternalParquetRecordWriter<T> {
 
   private void flushStore()
       throws IOException {
-    LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
+    if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
     if (store.allocatedSize() > 3 * blockSize) {
       LOG.warn("Too much memory used: " + store.memUsageString());
     }


[6/6] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7a38895d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7a38895d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7a38895d

Branch: refs/heads/index_support
Commit: 7a38895d4b15e7f7591c3591952394dbe2a5492a
Parents: 3b5c287 4c713fb
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:28:16 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:28:16 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  17 +-
 .../org/apache/tajo/cli/tools/TajoDump.java     |   8 +-
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  |  14 +-
 .../apache/tajo/cli/tsql/TajoFileHistory.java   |   8 +-
 .../org/apache/tajo/client/QueryClient.java     |   4 +-
 .../org/apache/tajo/client/QueryClientImpl.java |   4 +-
 .../apache/tajo/client/SessionConnection.java   |  10 +-
 .../java/org/apache/tajo/auth/UserRoleInfo.java |  80 +++
 .../java/org/apache/tajo/util/PlatformUtil.java |  65 +++
 .../DefaultFragmentScheduleAlgorithm.java       | 251 ---------
 .../tajo/master/FragmentScheduleAlgorithm.java  |  38 --
 .../FragmentScheduleAlgorithmFactory.java       |  68 ---
 .../master/GreedyFragmentScheduleAlgorithm.java | 429 ---------------
 .../apache/tajo/master/LazyTaskScheduler.java   | 529 -------------------
 .../querymaster/QueryMasterManagerService.java  |   6 +-
 tajo-core/src/main/resources/tajo-default.xml   |   7 +-
 .../org/apache/tajo/cli/tools/TestTajoDump.java |   5 +-
 .../apache/tajo/engine/eval/TestPredicates.java |   7 -
 .../engine/planner/TestQueryValidation.java     |   6 +
 .../invalid_store_format.sql                    |   1 +
 .../plan/verifier/PreLogicalPlanVerifier.java   |  18 +-
 .../parquet/ColumnChunkPageWriteStore.java      |   6 +-
 .../parquet/InternalParquetRecordReader.java    |  14 +-
 .../parquet/InternalParquetRecordWriter.java    |   4 +-
 24 files changed, 225 insertions(+), 1374 deletions(-)
----------------------------------------------------------------------



[3/6] tajo git commit: TAJO-1268: tajo-client module should not use UserGroupInformation.

Posted by ji...@apache.org.
TAJO-1268: tajo-client module should not use UserGroupInformation.

Closes #318


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/db549655
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/db549655
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/db549655

Branch: refs/heads/index_support
Commit: db549655034c1eb79d8f5785bc00aeecfc53a593
Parents: 09cad22
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 24 17:58:25 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 24 17:58:25 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../org/apache/tajo/cli/tools/TajoDump.java     |  8 +-
 .../org/apache/tajo/client/QueryClient.java     |  4 +-
 .../org/apache/tajo/client/QueryClientImpl.java |  4 +-
 .../apache/tajo/client/SessionConnection.java   | 10 +--
 .../java/org/apache/tajo/auth/UserRoleInfo.java | 80 ++++++++++++++++++++
 .../java/org/apache/tajo/util/PlatformUtil.java | 65 ++++++++++++++++
 .../org/apache/tajo/cli/tools/TestTajoDump.java |  5 +-
 8 files changed, 163 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 23d34d4..2ee58a2 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1268: tajo-client module should not use UserGroupInformation. 
+    (hyunsik)
+
     TAJO-1259: Change tsql history behavior. (Jaewoong Jung via hyunsik)
 
     TAJO-1261: Separate query and ddl execution codes from GlobalEngine. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
index d05564a..750ead0 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
@@ -20,7 +20,7 @@ package org.apache.tajo.cli.tools;
 
 import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.DDLBuilder;
 import org.apache.tajo.catalog.TableDesc;
@@ -85,7 +85,7 @@ public class TajoDump {
     final Pair<String, Integer> hostAndPort = getConnectionAddr(conf, cmd);
     final String hostName = hostAndPort.getFirst();
     final Integer port = hostAndPort.getSecond();
-    final UserGroupInformation userInfo = UserGroupInformation.getCurrentUser();
+    final UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
 
     String baseDatabaseName = null;
     if (cmd.getArgList().size() > 0) {
@@ -117,7 +117,7 @@ public class TajoDump {
     System.exit(0);
   }
 
-  public static void dump(TajoClient client, UserGroupInformation userInfo, String baseDatabaseName,
+  public static void dump(TajoClient client, UserRoleInfo userInfo, String baseDatabaseName,
                    boolean isDumpingAllDatabases, boolean includeUserName, boolean includeDate, PrintWriter out)
       throws SQLException, ServiceException {
     printHeader(out, userInfo, includeUserName, includeDate);
@@ -136,7 +136,7 @@ public class TajoDump {
     out.flush();
   }
 
-  private static void printHeader(PrintWriter writer, UserGroupInformation userInfo, boolean includeUSerName,
+  private static void printHeader(PrintWriter writer, UserRoleInfo userInfo, boolean includeUSerName,
                                   boolean includeDate) {
     writer.write("--\n");
     writer.write("-- Tajo database dump\n");

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
index 73a4d35..7c7db33 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
 import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
@@ -49,7 +49,7 @@ public interface QueryClient extends Closeable {
   @Override
   public void close();
 
-  public UserGroupInformation getUserInfo();
+  public UserRoleInfo getUserInfo();
 
   /**
    * Call to QueryMaster closing query resources

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index dc35968..f923965 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -21,11 +21,11 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
@@ -89,7 +89,7 @@ public class QueryClientImpl implements QueryClient {
   }
 
   @Override
-  public UserGroupInformation getUserInfo() {
+  public UserRoleInfo getUserInfo() {
     return connection.getUserInfo();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index db2bd2a..c849f2d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,11 +21,12 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -33,7 +34,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
@@ -67,7 +67,7 @@ public class SessionConnection implements Closeable {
 
   private final String baseDatabase;
 
-  private final UserGroupInformation userInfo;
+  private final UserRoleInfo userInfo;
 
   volatile TajoIdProtos.SessionIdProto sessionId;
 
@@ -109,7 +109,7 @@ public class SessionConnection implements Closeable {
     int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
     // Don't share connection pool per client
     connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
-    userInfo = UserGroupInformation.getCurrentUser();
+    userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
   }
 
@@ -167,7 +167,7 @@ public class SessionConnection implements Closeable {
     return conf;
   }
 
-  public UserGroupInformation getUserInfo() {
+  public UserRoleInfo getUserInfo() {
     return userInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java b/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
new file mode 100644
index 0000000..54c438c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/auth/UserRoleInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.auth;
+
+import org.apache.tajo.util.PlatformUtil;
+
+import java.lang.reflect.Method;
+
+public class UserRoleInfo {
+  private final String username;
+
+  public UserRoleInfo(String username) {
+    this.username = username;
+  }
+
+  public String getUserName() {
+    return username;
+  }
+
+  @Override
+  public String toString() {
+    return "user=" + username;
+  }
+
+  public synchronized static UserRoleInfo getCurrentUser() {
+    Class<?> c;
+    Object   o = null;
+    Method method = null;
+    String userName;
+
+    PlatformUtil.OsType osType = PlatformUtil.getOsType();
+
+    try {
+      switch (osType) {
+      case WINDOWS:
+        c = Class.forName("com.sun.security.auth.module.NTSystem");
+        o = Class.forName("com.sun.security.auth.module.NTSystem").newInstance();
+        method = c.getDeclaredMethod("getName");
+        break;
+      case LINUX_OR_UNIX:
+      case MAC:
+        c = Class.forName("com.sun.security.auth.module.UnixSystem");
+        o = Class.forName("com.sun.security.auth.module.UnixSystem").newInstance();
+        method = c.getDeclaredMethod("getUsername");
+        break;
+      case SOLARIS:
+        c = Class.forName("com.sun.security.auth.module.SolarisSystem");
+        o = Class.forName("com.sun.security.auth.module.SolarisSystem").newInstance();
+        method = c.getDeclaredMethod("getUsername");
+        break;
+
+      default:
+        throw new IllegalStateException("Unknown Operating System: " + PlatformUtil.getOsName());
+      }
+
+      userName = (String) method.invoke(o);
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+
+    return new UserRoleInfo(userName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
new file mode 100644
index 0000000..dabcdba
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/PlatformUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+public class PlatformUtil {
+  public static enum OsType {
+    WINDOWS,
+    MAC,
+    LINUX_OR_UNIX,
+    SOLARIS,
+    UNKNOWN
+  }
+
+  public static String getOsName() {
+    return System.getProperty("os.name").toLowerCase();
+  }
+
+  public static OsType getOsType() {
+    String osName = System.getProperty("os.name").toLowerCase();
+
+    if (PlatformUtil.isWindows(osName)) {
+      return OsType.WINDOWS;
+    } else if (isMac(osName)) {
+      return OsType.MAC;
+    } else if (isUnix(osName)) {
+      return OsType.LINUX_OR_UNIX;
+    } else if (isSolaris(osName)) {
+      return OsType.SOLARIS;
+    } else {
+      throw new RuntimeException("Unknown OS Type: " + osName);
+    }
+  }
+
+  private static boolean isWindows(String osName) {
+    return (osName.indexOf("win") >= 0);
+  }
+
+  private static boolean isMac(String osName) {
+    return (osName.indexOf("mac") >= 0);
+  }
+
+  private static boolean isUnix(String osName) {
+    return (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0 || osName.indexOf("aix") > 0 );
+  }
+
+  private static boolean isSolaris(String osName) {
+    return (osName.indexOf("sunos") >= 0) || (osName.indexOf("solaris") >= 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/db549655/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
index 29b7d3b..d6631f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
@@ -18,9 +18,8 @@
 
 package org.apache.tajo.cli.tools;
 
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.cli.tools.TajoDump;
+import org.apache.tajo.auth.UserRoleInfo;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -34,7 +33,7 @@ public class TestTajoDump extends QueryTestCaseBase {
       executeString("CREATE TABLE \"" + getCurrentDatabase() +
           "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
 
-      UserGroupInformation userInfo = UserGroupInformation.getCurrentUser();
+      UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
       ByteArrayOutputStream bos = new ByteArrayOutputStream();
       PrintWriter printWriter = new PrintWriter(bos);
       TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);


[4/6] tajo git commit: TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)

Posted by ji...@apache.org.
TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533e709b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533e709b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533e709b

Branch: refs/heads/index_support
Commit: 533e709b75ab7cf8bc8a06b48870dcf2ebc8fe11
Parents: db54965
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 25 19:06:54 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 25 19:06:54 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../DefaultFragmentScheduleAlgorithm.java       | 251 ---------
 .../tajo/master/FragmentScheduleAlgorithm.java  |  38 --
 .../FragmentScheduleAlgorithmFactory.java       |  68 ---
 .../master/GreedyFragmentScheduleAlgorithm.java | 429 ---------------
 .../apache/tajo/master/LazyTaskScheduler.java   | 529 -------------------
 .../querymaster/QueryMasterManagerService.java  |   6 +-
 tajo-core/src/main/resources/tajo-default.xml   |   7 +-
 8 files changed, 6 insertions(+), 1324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2ee58a2..124977d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -227,6 +227,8 @@ Release 0.9.1 - unreleased
 
   TASKS
 
+    TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon)
+
     TAJO-1233: Merge hbase_storage branch to the master branch. 
     (Hyoungjun via hyunsik)
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
deleted file mode 100644
index 406550d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-
-import java.util.*;
-import java.util.Map.Entry;
-
-/**
- * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
- * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
- * the fragments which are stored at the disk of the host specified by the arguments.
- */
-public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
-  private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
-  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
-      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
-  private Map<String, Set<FragmentPair>> rackFragmentMapping =
-      new HashMap<String, Set<FragmentPair>>();
-  private int fragmentNum = 0;
-  private Random random = new Random(System.currentTimeMillis());
-
-  public static class FragmentsPerDisk {
-    private Integer diskId;
-    private Set<FragmentPair> fragmentPairSet;
-
-    public FragmentsPerDisk(Integer diskId) {
-      this.diskId = diskId;
-      this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
-    }
-
-    public Integer getDiskId() {
-      return diskId;
-    }
-
-    public Set<FragmentPair> getFragmentPairSet() {
-      return fragmentPairSet;
-    }
-
-    public void addFragmentPair(FragmentPair fragmentPair) {
-      fragmentPairSet.add(fragmentPair);
-    }
-
-    public boolean removeFragmentPair(FragmentPair fragmentPair) {
-      return fragmentPairSet.remove(fragmentPair);
-    }
-
-    public int size() {
-      return fragmentPairSet.size();
-    }
-
-    public Iterator<FragmentPair> getFragmentPairIterator() {
-      return fragmentPairSet.iterator();
-    }
-
-    public boolean isEmpty() {
-      return fragmentPairSet.isEmpty();
-    }
-  }
-
-  @Override
-  public void addFragment(FragmentPair fragmentPair) {
-    String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
-    }
-    fragmentNum++;
-  }
-
-  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
-    // update the fragment maps per host
-    String normalizeHost = NetUtils.normalizeHost(host);
-    Map<Integer, FragmentsPerDisk> diskFragmentMap;
-    if (fragmentHostMapping.containsKey(normalizeHost)) {
-      diskFragmentMap = fragmentHostMapping.get(normalizeHost);
-    } else {
-      diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
-      fragmentHostMapping.put(normalizeHost, diskFragmentMap);
-    }
-    FragmentsPerDisk fragmentsPerDisk;
-    if (diskFragmentMap.containsKey(diskId)) {
-      fragmentsPerDisk = diskFragmentMap.get(diskId);
-    } else {
-      fragmentsPerDisk = new FragmentsPerDisk(diskId);
-      diskFragmentMap.put(diskId, fragmentsPerDisk);
-    }
-    fragmentsPerDisk.addFragmentPair(fragmentPair);
-
-    // update the fragment maps per rack
-    String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
-    Set<FragmentPair> fragmentPairList;
-    if (rackFragmentMapping.containsKey(rack)) {
-      fragmentPairList = rackFragmentMapping.get(rack);
-    } else {
-      fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
-      rackFragmentMapping.put(rack, fragmentPairList);
-    }
-    fragmentPairList.add(fragmentPair);
-  }
-
-  @Override
-  public void removeFragment(FragmentPair fragmentPair) {
-    boolean removed = false;
-    for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
-      String normalizedHost = NetUtils.normalizeHost(eachHost);
-      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
-      for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
-        FragmentsPerDisk fragmentsPerDisk = entry.getValue();
-        removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
-        if (removed) {
-          if (fragmentsPerDisk.size() == 0) {
-            diskFragmentMap.remove(entry.getKey());
-          }
-          if (diskFragmentMap.size() == 0) {
-            fragmentHostMapping.remove(normalizedHost);
-          }
-          break;
-        }
-      }
-      String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
-      if (rackFragmentMapping.containsKey(rack)) {
-        Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
-        fragmentPairs.remove(fragmentPair);
-        if (fragmentPairs.size() == 0) {
-          rackFragmentMapping.remove(rack);
-        }
-      }
-    }
-    if (removed) {
-      fragmentNum--;
-    }
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored on the host.
-   * @param host
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
-      Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
-      int randomIndex = random.nextInt(disks.size());
-      FragmentsPerDisk fragmentsPerDisk = null;
-      for (int i = 0; i < randomIndex; i++) {
-        fragmentsPerDisk = diskIterator.next();
-      }
-
-      if (fragmentsPerDisk != null) {
-        Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
-        if (fragmentIterator.hasNext()) {
-          return fragmentIterator.next();
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored at the disk of the host.
-   * @param host
-   * @param diskId
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-      if (fragmentsPerDiskMap.containsKey(diskId)) {
-        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-        if (!fragmentsPerDisk.isEmpty()) {
-          return fragmentsPerDisk.getFragmentPairIterator().next();
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
-   * @param host
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getRackLocalFragment(String host) {
-    String rack = RackResolver.resolve(host).getNetworkLocation();
-    if (rackFragmentMapping.containsKey(rack)) {
-      Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
-      if (!fragmentPairs.isEmpty()) {
-        return fragmentPairs.iterator().next();
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Randomly select a fragment among the total fragments.
-   * @return a randomly selected fragment
-   */
-  @Override
-  public FragmentPair getRandomFragment() {
-    if (!fragmentHostMapping.isEmpty()) {
-      return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
-    }
-    return null;
-  }
-
-  @Override
-  public FragmentPair[] getAllFragments() {
-    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-    for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
-      for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
-        fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
-      }
-    }
-    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
-  }
-
-  @Override
-  public int size() {
-    return fragmentNum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
deleted file mode 100644
index 10d993d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-/**
- * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
- * FragmentScheduleAlgorithm selects a fragment for the given argument.
- *
- * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
- */
-public interface FragmentScheduleAlgorithm {
-  void addFragment(FragmentPair fragmentPair);
-  void removeFragment(FragmentPair fragmentPair);
-
-  FragmentPair getHostLocalFragment(String host);
-  FragmentPair getHostLocalFragment(String host, Integer diskId);
-  FragmentPair getRackLocalFragment(String host);
-  FragmentPair getRandomFragment();
-  FragmentPair[] getAllFragments();
-
-  int size();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
deleted file mode 100644
index 820a0fb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class FragmentScheduleAlgorithmFactory {
-
-  private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  private static final Class<?>[] DEFAULT_PARAMS = {};
-
-  public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
-      throws IOException {
-    if (CACHED_ALGORITHM_CLASS != null) {
-      return CACHED_ALGORITHM_CLASS;
-    } else {
-      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
-          FragmentScheduleAlgorithm.class);
-    }
-
-    if (CACHED_ALGORITHM_CLASS == null) {
-      throw new IOException("Scheduler algorithm is null");
-    }
-    return CACHED_ALGORITHM_CLASS;
-  }
-
-  public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
-    T result;
-    try {
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-
-  public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
-    return get(getScheduleAlgorithmClass(conf));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
deleted file mode 100644
index 56cf8e5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
- * Disks of hosts have the priorities which are represented by the remaining number of fragments.
- * This algorithm selects a fragment with trying minimizing the maximum priority.
- */
-public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
-  private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
-  private final HostPriorityComparator hostComparator = new HostPriorityComparator();
-  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
-      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
-  private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
-  private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
-  private TopologyCache topologyCache = new TopologyCache();
-  private int totalFragmentNum = 0;
-
-  private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
-    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
-    FragmentsPerDisk fragmentsPerDisk;
-    if (fragmentHostMapping.containsKey(host)) {
-      fragmentsPerDiskMap = fragmentHostMapping.get(host);
-    } else {
-      fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
-      fragmentHostMapping.put(host, fragmentsPerDiskMap);
-    }
-    if (fragmentsPerDiskMap.containsKey(diskId)) {
-      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-    } else {
-      fragmentsPerDisk = new FragmentsPerDisk(diskId);
-      fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
-    }
-    return fragmentsPerDisk;
-  }
-
-  private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
-    if (priority > 0) {
-      // update the priority among the total hosts
-      PrioritizedHost prioritizedHost;
-      if (totalHostPriority.containsKey(hostAndDisk)) {
-        prioritizedHost = totalHostPriority.get(hostAndDisk);
-        prioritizedHost.priority = priority;
-      } else {
-        prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
-        totalHostPriority.put(hostAndDisk, prioritizedHost);
-      }
-
-      // update the priority among the hosts in a rack
-      String rack = topologyCache.resolve(hostAndDisk.host);
-      Set<PrioritizedHost> hostsOfRack;
-      if (!hostPriorityPerRack.containsKey(rack)) {
-        hostsOfRack = new HashSet<PrioritizedHost>();
-        hostsOfRack.add(prioritizedHost);
-        hostPriorityPerRack.put(rack, hostsOfRack);
-      }
-    } else {
-      if (totalHostPriority.containsKey(hostAndDisk)) {
-        PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
-
-        String rack = topologyCache.resolve(hostAndDisk.host);
-        if (hostPriorityPerRack.containsKey(rack)) {
-          Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
-          hostsOfRack.remove(prioritizedHost);
-          if (hostsOfRack.size() == 0){
-            hostPriorityPerRack.remove(rack);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void addFragment(FragmentPair fragmentPair) {
-    String[] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
-    }
-    totalFragmentNum++;
-  }
-
-  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
-    host = topologyCache.normalize(host);
-    FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
-    fragmentsPerDisk.addFragmentPair(fragmentPair);
-
-    int priority;
-    HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
-    if (totalHostPriority.containsKey(hostAndDisk)) {
-      priority = totalHostPriority.get(hostAndDisk).priority;
-    } else {
-      priority = 0;
-    }
-    updateHostPriority(hostAndDisk, priority+1);
-  }
-
-  public int size() {
-    return totalFragmentNum;
-  }
-
-  /**
-   * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host) {
-    String normalizedHost = topologyCache.normalize(host);
-    if (!fragmentHostMapping.containsKey(normalizedHost)) {
-      return null;
-    }
-
-    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-    List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
-    Collections.shuffle(disks);
-    FragmentsPerDisk fragmentsPerDisk = null;
-    FragmentPair fragmentPair = null;
-
-    for (Integer diskId : disks) {
-      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-      if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
-        fragmentPair = getBestFragment(fragmentsPerDisk);
-      }
-      if (fragmentPair != null) {
-        return fragmentPair;
-      }
-    }
-
-    return null;
-  }
-
-  /**
-   * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @param diskId
-   * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
-   */
-  @Override
-  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
-    String normalizedHost = NetUtils.normalizeHost(host);
-    if (fragmentHostMapping.containsKey(normalizedHost)) {
-      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
-      if (fragmentsPerDiskMap.containsKey(diskId)) {
-        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
-        if (!fragmentsPerDisk.isEmpty()) {
-          return getBestFragment(fragmentsPerDisk);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
-   * of the maximal priority.
-   * @param fragmentsPerDisk a fragment set
-   * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
-   */
-  private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
-    // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
-    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
-    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
-    Arrays.sort(sortedHosts, hostComparator);
-
-    for (PrioritizedHost nextHost : sortedHosts) {
-      if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
-        Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
-        if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
-          Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
-          Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
-          while (smallFragmentSetIterator.hasNext()) {
-            FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
-            if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
-              return eachFragmentOfSmallSet;
-            }
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
-   * priority.
-   * @param host
-   * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
-   */
-  public FragmentPair getRackLocalFragment(String host) {
-    host = topologyCache.normalize(host);
-    // Select a fragment from a host that has the most fragments in the rack
-    String rack = topologyCache.resolve(host);
-    Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
-    if (hostsOfRack != null && hostsOfRack.size() > 0) {
-      PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
-      Arrays.sort(sortedHosts, hostComparator);
-      for (PrioritizedHost nextHost : sortedHosts) {
-        if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
-          List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
-          Collections.shuffle(disks);
-
-          for (FragmentsPerDisk fragmentsPerDisk : disks) {
-            if (!fragmentsPerDisk.isEmpty()) {
-              return fragmentsPerDisk.getFragmentPairIterator().next();
-            }
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Selects a fragment from the disk of the maximum priority.
-   * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
-   */
-  public FragmentPair getRandomFragment() {
-    // Select a fragment from a host that has the most fragments
-    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
-    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
-    Arrays.sort(sortedHosts, hostComparator);
-    PrioritizedHost randomHost = sortedHosts[0];
-    if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
-      Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
-      if (fragmentsPerDiskIterator.hasNext()) {
-        Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
-        if (fragmentPairIterator.hasNext()) {
-          return fragmentPairIterator.next();
-        }
-      }
-    }
-    return null;
-  }
-
-  public FragmentPair[] getAllFragments() {
-    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-    for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
-      for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
-        Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
-        fragmentPairs.addAll(pairSet);
-      }
-    }
-    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
-  }
-
-  public void removeFragment(FragmentPair fragmentPair) {
-    String [] hosts = fragmentPair.getLeftFragment().getHosts();
-    int[] diskIds = null;
-    if (fragmentPair.getLeftFragment() instanceof FileFragment) {
-      diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      int diskId = diskIds == null ? -1 : diskIds[i];
-      String normalizedHost = NetUtils.normalizeHost(hosts[i]);
-      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
-
-      if (diskFragmentMap != null) {
-        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
-        if (fragmentsPerDisk != null) {
-          boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
-          if (isRemoved) {
-            if (fragmentsPerDisk.size() == 0) {
-              diskFragmentMap.remove(diskId);
-              if (diskFragmentMap.size() == 0) {
-                fragmentHostMapping.remove(normalizedHost);
-              }
-            }
-            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
-            if (totalHostPriority.containsKey(hostAndDisk)) {
-              PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
-              updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
-            }
-          }
-        }
-      }
-    }
-
-    totalFragmentNum--;
-  }
-
-  private static class HostAndDisk {
-    private String host;
-    private Integer diskId;
-
-    public HostAndDisk(String host, Integer diskId) {
-      this.host = host;
-      this.diskId = diskId;
-    }
-
-    public String getHost() {
-      return host;
-    }
-
-    public int getDiskId() {
-      return diskId;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(host, diskId);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof HostAndDisk) {
-        HostAndDisk other = (HostAndDisk) o;
-        return this.host.equals(other.host) &&
-            TUtil.checkEquals(this.diskId, other.diskId);
-      }
-      return false;
-    }
-  }
-
-  public static class PrioritizedHost {
-    private HostAndDisk hostAndDisk;
-    private int priority;
-
-    public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
-      this.hostAndDisk = hostAndDisk;
-      this.priority = priority;
-    }
-
-    public PrioritizedHost(String host, Integer diskId, int priority) {
-      this.hostAndDisk = new HostAndDisk(host, diskId);
-      this.priority = priority;
-    }
-
-    public String getHost() {
-      return hostAndDisk.host;
-    }
-
-    public Integer getDiskId() {
-      return hostAndDisk.diskId;
-    }
-
-    public Integer getPriority() {
-      return priority;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof PrioritizedHost) {
-        PrioritizedHost other = (PrioritizedHost) o;
-        return this.hostAndDisk.equals(other.hostAndDisk);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return hostAndDisk.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
-    }
-  }
-
-
-  public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
-
-    @Override
-    public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
-      return prioritizedHost2.priority - prioritizedHost.priority;
-    }
-  }
-
-
-  public static class TopologyCache {
-    private Map<String, String> hostRackMap = new HashMap<String, String>();
-    private Map<String, String> normalizedHostMap = new HashMap<String, String>();
-
-    public String normalize(String host) {
-      if (normalizedHostMap.containsKey(host)) {
-        return normalizedHostMap.get(host);
-      } else {
-        String normalized = NetUtils.normalizeHost(host);
-        normalizedHostMap.put(host, normalized);
-        return normalized;
-      }
-    }
-
-    public String resolve(String host) {
-      if (hostRackMap.containsKey(host)) {
-        return hostRackMap.get(host);
-      } else {
-        String rack = RackResolver.resolve(host).getNetworkLocation();
-        hostRackMap.put(host, rack);
-        return rack;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
deleted file mode 100644
index 32af17b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class LazyTaskScheduler extends AbstractTaskScheduler {
-  private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
-
-  private final TaskSchedulerContext context;
-  private final Stage stage;
-
-  private Thread schedulingThread;
-  private volatile boolean stopEventHandling;
-
-  BlockingQueue<TaskSchedulerEvent> eventQueue
-      = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
-  private TaskRequests taskRequests;
-  private FragmentScheduleAlgorithm scheduledFragments;
-  private ScheduledFetches scheduledFetches;
-
-  private int diskLocalAssigned = 0;
-  private int hostLocalAssigned = 0;
-  private int rackLocalAssigned = 0;
-  private int totalAssigned = 0;
-
-  private int nextTaskId = 0;
-  private int containerNum;
-
-  public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) {
-    super(LazyTaskScheduler.class.getName());
-    this.context = context;
-    this.stage = stage;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    taskRequests  = new TaskRequests();
-    try {
-      scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
-      LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    if (!context.isLeafQuery()) {
-      scheduledFetches = new ScheduledFetches();
-    }
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
-        stage.getContext().getQueryMasterContext().getWorkerContext(),
-        context.getEstimatedTaskNum(), 512);
-
-    LOG.info("Start TaskScheduler");
-    this.schedulingThread = new Thread() {
-      public void run() {
-
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-            break;
-          }
-
-          schedule();
-        }
-        LOG.info("TaskScheduler schedulingThread stopped");
-      }
-    };
-
-    this.schedulingThread.start();
-    super.start();
-  }
-
-  private static final TaskAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
-  static {
-    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
-    TajoWorkerProtocol.TaskRequestProto.Builder builder =
-        TajoWorkerProtocol.TaskRequestProto.newBuilder();
-    builder.setId(NULL_ATTEMPT_ID.getProto());
-    builder.setShouldDie(true);
-    builder.setOutputTable("");
-    builder.setSerializedData("");
-    builder.setClusteredOutput(false);
-    stopTaskRunnerReq = builder.build();
-  }
-
-  @Override
-  public void stop() {
-    stopEventHandling = true;
-    schedulingThread.interrupt();
-
-    // Return all of request callbacks instantly.
-    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-      req.getCallback().run(stopTaskRunnerReq);
-    }
-
-    LOG.info("Task Scheduler stopped");
-    super.stop();
-  }
-
-  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
-  public void schedule() {
-    if (taskRequests.size() > 0) {
-      if (context.isLeafQuery()) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", Fragment Schedule Request: " +
-            scheduledFragments.size());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledFragments.size());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          assignLeafTasks(taskRequestEvents);
-        }
-        taskRequestEvents.clear();
-      } else {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", Fetch Schedule Request: " +
-            scheduledFetches.size());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledFetches.size());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          assignNonLeafTasks(taskRequestEvents);
-        }
-        taskRequestEvents.clear();
-      }
-    }
-  }
-
-  @Override
-  public void handle(TaskSchedulerEvent event) {
-    int qSize = eventQueue.size();
-    if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-    }
-    int remCapacity = eventQueue.remainingCapacity();
-    if (remCapacity < 1000) {
-      LOG.warn("Very low remaining capacity in the event-queue "
-          + "of DefaultTaskScheduler: " + remCapacity);
-    }
-
-    if (event.getType() == EventType.T_SCHEDULE) {
-      if (event instanceof FragmentScheduleEvent) {
-        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
-        Collection<Fragment> rightFragments = castEvent.getRightFragments();
-        if (rightFragments == null || rightFragments.isEmpty()) {
-          scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
-        } else {
-          for (Fragment eachFragment: rightFragments) {
-            scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
-          }
-        }
-        if (castEvent.getLeftFragment() instanceof FileFragment) {
-          initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds());
-        }
-      } else if (event instanceof FetchScheduleEvent) {
-        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        scheduledFetches.addFetch(castEvent.getFetches());
-      } else if (event instanceof TaskAttemptToSchedulerEvent) {
-        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
-        assignTask(castEvent.getContext(), castEvent.getTaskAttempt());
-      }
-    }
-  }
-
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    taskRequests.handle(event);
-  }
-
-  @Override
-  public int remainingScheduledObjectNum() {
-    if (context.isLeafQuery()) {
-      return scheduledFragments.size();
-    } else {
-      return scheduledFetches.size();
-    }
-  }
-
-  private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
-
-  private void initDiskBalancer(String[] hosts, int[] diskIds) {
-    for (int i = 0; i < hosts.length; i++) {
-      DiskBalancer diskBalancer;
-      String normalized = NetUtils.normalizeHost(hosts[i]);
-      if (hostDiskBalancerMap.containsKey(normalized)) {
-        diskBalancer = hostDiskBalancerMap.get(normalized);
-      } else {
-        diskBalancer = new DiskBalancer(normalized);
-        hostDiskBalancerMap.put(normalized, diskBalancer);
-      }
-      diskBalancer.addDiskId(diskIds[i]);
-    }
-  }
-
-  private static class DiskBalancer {
-    private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId,
-      Integer>();
-    private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
-    private String host;
-
-    public DiskBalancer(String host){
-      this.host = host;
-    }
-
-    public void addDiskId(Integer diskId) {
-      if (!diskReferMap.containsKey(diskId)) {
-        diskReferMap.put(diskId, 0);
-      }
-    }
-
-    public Integer getDiskId(TajoContainerId containerId) {
-      if (!containerDiskMap.containsKey(containerId)) {
-        assignVolumeId(containerId);
-      }
-
-      return containerDiskMap.get(containerId);
-    }
-
-    public void assignVolumeId(TajoContainerId containerId){
-      Map.Entry<Integer, Integer> volumeEntry = null;
-
-      for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
-        if(volumeEntry == null) volumeEntry = entry;
-
-        if (volumeEntry.getValue() >= entry.getValue()) {
-          volumeEntry = entry;
-        }
-      }
-
-      if(volumeEntry != null){
-        diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
-        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
-            + diskReferMap.get(volumeEntry.getKey()));
-        containerDiskMap.put(containerId, volumeEntry.getKey());
-      }
-    }
-
-    public String getHost() {
-      return host;
-    }
-  }
-
-  private class TaskRequests implements EventHandler<TaskRequestEvent> {
-    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
-        new LinkedBlockingQueue<TaskRequestEvent>();
-
-    @Override
-    public void handle(TaskRequestEvent event) {
-      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
-      if(stopEventHandling) {
-        event.getCallback().run(stopTaskRunnerReq);
-        return;
-      }
-      int qSize = taskRequestQueue.size();
-      if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-      }
-      int remCapacity = taskRequestQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue "
-            + "of DefaultTaskScheduler: " + remCapacity);
-      }
-
-      taskRequestQueue.add(event);
-    }
-
-    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
-                                int num) {
-      taskRequestQueue.drainTo(taskRequests, num);
-    }
-
-    public int size() {
-      return taskRequestQueue.size();
-    }
-  }
-
-  private long adjustTaskSize() {
-    long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
-    long fragNumPerTask = context.getTaskSize() / originTaskSize;
-    if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
-      return context.getTaskSize();
-    } else {
-      fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
-      return originTaskSize * fragNumPerTask;
-    }
-  }
-
-  private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
-    Collections.shuffle(taskRequests);
-    Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-    TaskRequestEvent taskRequest;
-    while (it.hasNext() && scheduledFragments.size() > 0) {
-      taskRequest = it.next();
-      LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
-          "containerId=" + taskRequest.getContainerId());
-      ContainerProxy container = context.getMasterContext().getResourceAllocator().
-          getContainer(taskRequest.getContainerId());
-
-      if(container == null) {
-        continue;
-      }
-
-      String host = container.getTaskHostName();
-      TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID,
-          host, taskRequest.getCallback());
-      Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
-
-      FragmentPair fragmentPair;
-      List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
-      boolean diskLocal = false;
-      long assignedFragmentSize = 0;
-      long taskSize = adjustTaskSize();
-      LOG.info("Adjusted task size: " + taskSize);
-
-      TajoConf conf = stage.getContext().getConf();
-      // host local, disk local
-      String normalized = NetUtils.normalizeHost(host);
-      Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
-      if (diskId != null && diskId != -1) {
-        do {
-          fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
-          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
-            break;
-          }
-
-          if (assignedFragmentSize +
-              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
-            break;
-          } else {
-            fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
-            if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
-            }
-          }
-          scheduledFragments.removeFragment(fragmentPair);
-          diskLocal = true;
-        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
-      }
-
-      if (assignedFragmentSize < taskSize) {
-        // host local
-        do {
-          fragmentPair = scheduledFragments.getHostLocalFragment(host);
-          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
-            break;
-          }
-
-          if (assignedFragmentSize +
-              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
-            break;
-          } else {
-            fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
-            if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
-            }
-          }
-          scheduledFragments.removeFragment(fragmentPair);
-          diskLocal = false;
-        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
-      }
-
-      // rack local
-      if (fragmentPairs.size() == 0) {
-        fragmentPair = scheduledFragments.getRackLocalFragment(host);
-
-        // random
-        if (fragmentPair == null) {
-          fragmentPair = scheduledFragments.getRandomFragment();
-        } else {
-          rackLocalAssigned++;
-        }
-
-        if (fragmentPair != null) {
-          fragmentPairs.add(fragmentPair);
-          scheduledFragments.removeFragment(fragmentPair);
-        }
-      } else {
-        if (diskLocal) {
-          diskLocalAssigned++;
-        } else {
-          hostLocalAssigned++;
-        }
-      }
-
-      if (fragmentPairs.size() == 0) {
-        throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
-      }
-
-      LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
-
-      task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
-      stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-    }
-  }
-
-  private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
-    Iterator<TaskRequestEvent> it = taskRequests.iterator();
-
-    TaskRequestEvent taskRequest;
-    while (it.hasNext()) {
-      taskRequest = it.next();
-      LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
-      // random allocation
-      if (scheduledFetches.size() > 0) {
-        LOG.debug("Assigned based on * match");
-        ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
-            taskRequest.getContainerId());
-        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID,
-            container.getTaskHostName(), taskRequest.getCallback());
-        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
-        task.setFragment(scheduledFragments.getAllFragments());
-        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-      }
-    }
-  }
-
-  private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) {
-    TaskAttemptId attemptId = taskAttempt.getId();
-    TaskRequest taskAssign = new TaskRequestImpl(
-        attemptId,
-        new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()),
-        "",
-        false,
-        taskAttempt.getTask().getLogicalPlan().toJson(),
-        context.getMasterContext().getQueryContext(),
-        stage.getDataChannel(), stage.getBlock().getEnforcer());
-    if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
-      taskAssign.setInterQuery();
-    }
-
-    if (!context.isLeafQuery()) {
-      Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch();
-      scheduledFetches.popNextFetch();
-
-      for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) {
-        for (FetchImpl eachValue : fetchEntry.getValue()) {
-          taskAssign.addFetch(fetchEntry.getKey(), eachValue);
-        }
-      }
-    }
-
-    context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-        attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
-
-    totalAssigned++;
-    attemptContext.getCallback().run(taskAssign.getProto());
-
-    if (context.isLeafQuery()) {
-      LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
-      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
-      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
-    }
-  }
-
-  private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
-    if (masterPlan.isRoot(block)) {
-      return false;
-    }
-
-    ExecutionBlock parent = masterPlan.getParent(block);
-    if (masterPlan.isRoot(parent) && parent.hasUnion()) {
-      return false;
-    }
-
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index c2e1009..9f7d3f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -30,9 +30,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.LazyTaskScheduler;
-import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.DefaultTaskScheduler;
 import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -128,7 +128,7 @@ public class QueryMasterManagerService extends CompositeService
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
 
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        done.run(LazyTaskScheduler.stopTaskRunnerReq);
+        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
       } else {
         TajoContainerId cid =
             queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());

http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index c49e8e5..db92b02 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -42,9 +42,4 @@
     <value>org.apache.tajo.master.DefaultTaskScheduler</value>
   </property>
 
-  <property>
-    <name>tajo.querymaster.lazy-task-scheduler.algorithm</name>
-    <value>org.apache.tajo.master.GreedyFragmentScheduleAlgorithm</value>
-  </property>
-
-</configuration>
\ No newline at end of file
+</configuration>