You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/02/02 13:29:06 UTC

[01/10] tajo git commit: TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (jaehwa)

Repository: tajo
Updated Branches:
  refs/heads/index_support 7cafc33b0 -> 31698f537


TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (jaehwa)

Closes #363


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b9719ba7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b9719ba7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b9719ba7

Branch: refs/heads/index_support
Commit: b9719ba78ef441772ee7f5ffa44844627df95891
Parents: c429c97
Author: JaeHwa Jung <bl...@apache.org>
Authored: Wed Jan 28 13:33:01 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Wed Jan 28 13:33:01 2015 +0900

----------------------------------------------------------------------
 CHANGES                                                  |  3 +++
 .../apache/tajo/storage/hbase/HBaseStorageConstants.java |  1 +
 .../apache/tajo/storage/hbase/HBaseStorageManager.java   | 11 +++++++++++
 3 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 453e163..3b59347 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.10.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port.
+    (jaehwa)
+
     TAJO-1309: Add missing break point in physical operator. (jinho)
 
     TAJO-1307: HBaseStorageManager need to support for users to use

http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
index 2c525a1..99140e6 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
@@ -27,6 +27,7 @@ public interface HBaseStorageConstants {
   public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys";
   public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file";
   public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
+  public static final String META_ZK_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
   public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter";
 
   public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode";

http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index 59d1b48..2a635d8 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -312,6 +312,17 @@ public class HBaseStorageManager extends StorageManager {
           HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
     }
 
+    String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) {
+      zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, "");
+      hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort);
+    }
+
+    if (zkPort == null || zkPort.trim().isEmpty()) {
+      throw new IOException("HBase mapped table is required a '" +
+        HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute.");
+    }
+
     for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
       String key = eachOption.getKey();
       if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {


[06/10] tajo git commit: TAJO-1322: Invalid stored caching on StorageManager. (jinho)

Posted by ji...@apache.org.
TAJO-1322: Invalid stored caching on StorageManager. (jinho)

Closes #367


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e656ee28
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e656ee28
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e656ee28

Branch: refs/heads/index_support
Commit: e656ee2872e660d95c25e7b080064f44a8d9d01e
Parents: 4595375
Author: jhkim <jh...@apache.org>
Authored: Mon Feb 2 14:42:24 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Feb 2 14:42:24 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/storage/StorageManager.java | 40 ++++++-------
 .../tajo/storage/TestFileStorageManager.java    | 60 +++++++++++++++-----
 3 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3b59347..4a3715b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1322: Invalid stored caching on StorageManager. (jinho)
+
     TAJO-1319: Tajo can't find HBase configuration file. (jaehwa)
 
     TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 34caa80..d929591 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -42,7 +42,6 @@ import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.net.URI;
 import java.text.NumberFormat;
 import java.util.List;
 import java.util.Map;
@@ -284,14 +283,11 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException {
-    URI uri;
     TajoConf copiedConf = new TajoConf(tajoConf);
     if (warehousePath != null) {
       copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString());
     }
-    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
-    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-    return getStorageManager(copiedConf, StoreType.CSV, key);
+    return getStorageManager(copiedConf, StoreType.CSV);
   }
 
   /**
@@ -303,7 +299,7 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
-    if ("HBASE".equals(storeType)) {
+    if ("HBASE".equalsIgnoreCase(storeType)) {
       return getStorageManager(tajoConf, StoreType.HBASE);
     } else {
       return getStorageManager(tajoConf, StoreType.CSV);
@@ -319,7 +315,12 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException {
-    return getStorageManager(tajoConf, storeType, null);
+    FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+    if (fileSystem != null) {
+      return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
+    } else {
+      return getStorageManager(tajoConf, storeType, null);
+    }
   }
 
   /**
@@ -331,22 +332,23 @@ public abstract class StorageManager {
    * @return
    * @throws java.io.IOException
    */
-  public static synchronized StorageManager getStorageManager (
+  private static synchronized StorageManager getStorageManager (
       TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException {
+
+    String typeName;
+    switch (storeType) {
+      case HBASE:
+        typeName = "hbase";
+        break;
+      default:
+        typeName = "hdfs";
+    }
+
     synchronized (storageManagers) {
-      String storeKey = CatalogUtil.getStoreTypeString(storeType) + managerKey;
+      String storeKey = typeName + "_" + managerKey;
       StorageManager manager = storageManagers.get(storeKey);
-      if (manager == null) {
-        String typeName = "hdfs";
-
-        switch (storeType) {
-          case HBASE:
-            typeName = "hbase";
-            break;
-          default:
-            typeName = "hdfs";
-        }
 
+      if (manager == null) {
         Class<? extends StorageManager> storageManagerClass =
             tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 19a39a2..c4df8d7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -38,7 +38,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
@@ -48,7 +47,6 @@ import static org.junit.Assert.*;
 public class TestFileStorageManager {
 	private TajoConf conf;
 	private static String TEST_PATH = "target/test-data/TestFileStorageManager";
-  StorageManager sm = null;
   private Path testDir;
   private FileSystem fs;
 
@@ -57,7 +55,6 @@ public class TestFileStorageManager {
 		conf = new TajoConf();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getFileStorageManager(conf, testDir);
 	}
 
 	@After
@@ -84,14 +81,17 @@ public class TestFileStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     fs.mkdirs(path.getParent());
-		Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path);
+    FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
+
+		Appender appender = fileStorageManager.getAppender(meta, schema, path);
     appender.init();
 		for(Tuple t : tuples) {
 		  appender.addTuple(t);
 		}
 		appender.close();
 
-		Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path);
+		Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
     scanner.init();
 		int i=0;
 		while(scanner.next() != null) {
@@ -110,6 +110,9 @@ public class TestFileStorageManager {
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
+    cluster.waitClusterUp();
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
 
     int testCount = 10;
     Path tablePath = new Path("/testGetSplit");
@@ -125,7 +128,8 @@ public class TestFileStorageManager {
       }
 
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);
@@ -148,10 +152,7 @@ public class TestFileStorageManager {
       assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
       fs.close();
     } finally {
-      cluster.shutdown();
-
-      File dir = new File(testDataPath);
-      dir.delete();
+      cluster.shutdown(true);
     }
   }
 
@@ -165,6 +166,10 @@ public class TestFileStorageManager {
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
 
     int testCount = 10;
     Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
@@ -177,7 +182,8 @@ public class TestFileStorageManager {
         DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
       }
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);
@@ -194,10 +200,36 @@ public class TestFileStorageManager {
       assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
       fs.close();
     } finally {
-      cluster.shutdown();
+      cluster.shutdown(true);
+    }
+  }
 
-      File dir = new File(testDataPath);
-      dir.delete();
+  @Test
+  public void testStoreType() throws Exception {
+    final Configuration hdfsConf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
+        .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(hdfsConf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+    try {
+      /* Local FileSystem */
+      FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, StoreType.CSV);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
+
+      /* Distributed FileSystem */
+      sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, StoreType.CSV);
+      assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
+      assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
+    } finally {
+      cluster.shutdown(true);
     }
   }
 }


[08/10] tajo git commit: TAJO-1313: Tajo-dump creates DDLs for information_schema tables

Posted by ji...@apache.org.
TAJO-1313: Tajo-dump creates DDLs for information_schema tables

Closes #365


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1e007595
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1e007595
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1e007595

Branch: refs/heads/index_support
Commit: 1e007595dc4ebbcab42d8e32e9d519e7cbf6f96b
Parents: fd73074
Author: Jihun Kang <ji...@apache.org>
Authored: Mon Feb 2 17:22:15 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Mon Feb 2 17:22:15 2015 +0900

----------------------------------------------------------------------
 CHANGES                                             |  3 +++
 .../org/apache/tajo/catalog/CatalogConstants.java   |  2 ++
 .../java/org/apache/tajo/cli/tools/TajoDump.java    | 16 +++++++++++++++-
 3 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8466a38..666ee9a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,9 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1313: Tajo-dump creates DDLs for information_schema tables.
+    (jihun)
+
     TAJO-1322: Invalid stored caching on StorageManager. (jinho)
 
     TAJO-1319: Tajo can't find HBase configuration file. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 6ec52b9..a8c5c9b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -53,4 +53,6 @@ public class CatalogConstants {
   public static final String COL_DATABASES_PK = "DB_ID";
   public static final String COL_TABLES_PK = "TID";
   public static final String COL_TABLES_NAME = "TABLE_NAME";
+  
+  public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema";
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
index 750ead0..7f38a5d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
@@ -19,11 +19,14 @@
 package org.apache.tajo.cli.tools;
 
 import com.google.protobuf.ServiceException;
+
 import org.apache.commons.cli.*;
 import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.DDLBuilder;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
@@ -116,6 +119,10 @@ public class TajoDump {
 
     System.exit(0);
   }
+  
+  private static boolean isAcceptableDumpingDatabase(String databaseName) {
+    return (databaseName == null || !databaseName.equalsIgnoreCase(CatalogConstants.INFORMATION_SCHEMA_DB_NAME));
+  }
 
   public static void dump(TajoClient client, UserRoleInfo userInfo, String baseDatabaseName,
                    boolean isDumpingAllDatabases, boolean includeUserName, boolean includeDate, PrintWriter out)
@@ -128,7 +135,9 @@ public class TajoDump {
       Collections.sort(sorted);
 
       for (String databaseName : sorted) {
-        dumpDatabase(client, databaseName, out);
+        if (isAcceptableDumpingDatabase(databaseName)) {
+          dumpDatabase(client, databaseName, out);
+        }
       }
     } else {
       dumpDatabase(client, baseDatabaseName, out);
@@ -166,6 +175,11 @@ public class TajoDump {
     for (String tableName : tableNames) {
       try {
         TableDesc table = client.getTableDesc(CatalogUtil.buildFQName(databaseName, tableName));
+        
+        if (table.getMeta().getStoreType() == StoreType.SYSTEM) {
+          continue;
+        }
+        
         if (table.isExternal()) {
           writer.write(DDLBuilder.buildDDLForExternalTable(table));
         } else {


[05/10] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/255046be
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/255046be
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/255046be

Branch: refs/heads/index_support
Commit: 255046be8fb2eeeb6155322bff9a2bb20ad3e6ee
Parents: 7cafc33 4595375
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Jan 30 23:32:33 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Jan 30 23:32:33 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/catalog/AbstractCatalogClient.java     |  12 +-
 .../apache/tajo/client/DummyServiceTracker.java |  84 +++
 .../apache/tajo/client/SessionConnection.java   |  44 +-
 .../org/apache/tajo/client/TajoClientImpl.java  |  38 +-
 .../apache/tajo/client/TajoHAClientUtil.java    |  14 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   7 +-
 .../java/org/apache/tajo/ha/HAServiceUtil.java  |  39 --
 .../apache/tajo/service/BaseServiceTracker.java |  97 ++++
 .../apache/tajo/service/HAServiceTracker.java   |  48 ++
 .../org/apache/tajo/service/ServiceTracker.java |  63 ++
 .../tajo/service/ServiceTrackerException.java   |  30 +
 .../tajo/service/ServiceTrackerFactory.java     |  41 ++
 .../org/apache/tajo/service/TajoMasterInfo.java |  89 +++
 .../org/apache/tajo/benchmark/BenchmarkSet.java |  15 +-
 .../main/java/org/apache/tajo/ha/HAService.java |  56 --
 .../org/apache/tajo/ha/HAServiceHDFSImpl.java   | 316 ----------
 .../org/apache/tajo/ha/HdfsServiceTracker.java  | 576 +++++++++++++++++++
 .../java/org/apache/tajo/ha/TajoMasterInfo.java |  89 ---
 .../apache/tajo/master/TajoContainerProxy.java  |  27 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  30 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  66 +--
 .../main/java/org/apache/tajo/util/JSPUtil.java |  12 +-
 .../tajo/worker/TajoResourceAllocator.java      |  28 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  46 +-
 .../tajo/worker/TajoWorkerClientService.java    |   2 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  20 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  11 +-
 .../resources/webapps/admin/catalogview.jsp     |   5 +-
 .../main/resources/webapps/admin/cluster.jsp    |   7 +-
 .../src/main/resources/webapps/admin/index.jsp  |   7 +-
 .../resources/webapps/admin/query_executor.jsp  |   5 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  10 +-
 tajo-docs/src/main/sphinx/hbase_integration.rst |   6 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  10 +-
 .../org/apache/tajo/jdbc/TajoStatement.java     |   4 +-
 .../org/apache/tajo/storage/StorageUtil.java    |  16 -
 .../storage/hbase/HBaseStorageConstants.java    |   1 +
 .../tajo/storage/hbase/HBaseStorageManager.java |  11 +
 39 files changed, 1175 insertions(+), 810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 53e5ab4,718f7d6..2483c1f
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -35,8 -34,9 +34,10 @@@ import org.apache.tajo.rpc.RpcConnectio
  import org.apache.tajo.rpc.ServerCallable;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+ import org.apache.tajo.service.ServiceTracker;
+ import org.apache.tajo.service.ServiceTrackerFactory;
  import org.apache.tajo.util.ProtoUtil;
 +import org.apache.tajo.util.TUtil;
  
  import java.net.InetSocketAddress;
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 9522746,f8eef28..6f6059e
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@@ -32,10 -31,7 +32,8 @@@ import org.apache.tajo.catalog.TableDes
  import org.apache.tajo.catalog.TableMeta;
  import org.apache.tajo.catalog.partition.PartitionMethodDesc;
  import org.apache.tajo.catalog.proto.CatalogProtos;
 +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
  import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.conf.TajoConf.ConfVars;
- import org.apache.tajo.ha.HAServiceUtil;
  import org.apache.tajo.ipc.ClientProtos.*;
  import org.apache.tajo.jdbc.TajoMemoryResultSet;
  import org.apache.tajo.jdbc.TajoResultSet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------


[03/10] tajo git commit: TAJO-1306: HAServiceUtil should not directly use HDFS.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 1496b62..a30df54 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -44,6 +44,7 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;
@@ -229,23 +230,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
       // worker may fail to connect existing active master. Thus,
       // if worker can't connect the master, worker should try to connect another master and
       // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        }
-      } else {
-        rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            QueryCoordinatorProtocol.class, true);
-      }
 
+      ServiceTracker serviceTracker = workerContext.getServiceTracker();
+      rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
       QueryCoordinatorProtocolService masterService = rpc.getStub();
 
       CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
@@ -353,24 +340,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       NettyClientBase tmClient = null;
       try {
-        // In TajoMaster HA mode, if backup master be active status,
-        // worker may fail to connect existing active master. Thus,
-        // if worker can't connect the master, worker should try to connect another master and
-        // update master address in worker context.
-        if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-          try {
-            tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                QueryCoordinatorProtocol.class, true);
-          } catch (Exception e) {
-            queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
-            queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-            tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                QueryCoordinatorProtocol.class, true);
-          }
-        } else {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        }
+        tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(),
+            QueryCoordinatorProtocol.class, true);
 
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
@@ -474,32 +445,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
           for(QueryMasterTask eachTask: tempTasks) {
             NettyClientBase tmClient;
             try {
-              // In TajoMaster HA mode, if backup master be active status,
-              // worker may fail to connect existing active master. Thus,
-              // if worker can't connect the master, worker should try to connect another master and
-              // update master address in worker context.
-              if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-                try {
-                  tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      QueryCoordinatorProtocol.class, true);
-                } catch (Exception e) {
-                  queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-                      HAServiceUtil.getResourceTrackerAddress(systemConf));
-                  queryMasterContext.getWorkerContext().setTajoMasterAddress(
-                      HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-                  tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      QueryCoordinatorProtocol.class, true);
-                }
-              } else {
-                tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                    QueryCoordinatorProtocol.class, true);
-              }
 
+              ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
+              tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
+                  QueryCoordinatorProtocol.class, true);
               QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 
-              CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
-                  new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
-
+              CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
               masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
             } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 82fb37f..13f4dcc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -23,16 +23,16 @@ import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.ha.HAService;
 import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Stage;
-import org.apache.tajo.util.history.TaskHistory;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.history.StageHistory;
-import org.apache.tajo.worker.TaskRunnerHistory;
+import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.worker.TaskRunner;
+import org.apache.tajo.worker.TaskRunnerHistory;
 
 import java.text.DecimalFormat;
 import java.util.*;
@@ -191,7 +191,7 @@ public class JSPUtil {
   }
 
   public static String getMasterActiveLabel(MasterContext context) {
-    HAService haService = context.getHAService();
+    ServiceTracker haService = context.getHAService();
     String activeLabel = "";
     if (haService != null) {
       if (haService.isActiveStatus()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index dd408c9..c6a06f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -51,6 +51,7 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.net.InetSocketAddress;
@@ -274,31 +275,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       RpcConnectionPool connPool = RpcConnectionPool.getPool();
       NettyClientBase tmClient = null;
       try {
-
-        // In TajoMaster HA mode, if backup master be active status,
-        // worker may fail to connect existing active master. Thus,
-        // if worker can't connect the master, worker should try to connect another master and
-        // update master address in worker context.
-        if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-          try {
-            tmClient = connPool.getConnection(
-              queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-          } catch (Exception e) {
-            queryTaskContext.getQueryMasterContext().getWorkerContext().
-              setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
-            queryTaskContext.getQueryMasterContext().getWorkerContext().
-              setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
-            tmClient = connPool.getConnection(
-              queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-          }
-        } else {
-          tmClient = connPool.getConnection(
-            queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            QueryCoordinatorProtocol.class, true);
-        }
-
+        ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
+        tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index c217e3e..7f73916 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -34,8 +34,9 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ha.TajoMasterInfo;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.service.TajoMasterInfo;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -63,7 +64,6 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -112,6 +112,8 @@ public class TajoWorker extends CompositeService {
   @Deprecated
   private boolean taskRunnerMode;
 
+  private ServiceTracker serviceTracker;
+
   private WorkerHeartbeatService workerHeartbeatThread;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -189,6 +191,8 @@ public class TajoWorker extends CompositeService {
     this.systemConf = (TajoConf)conf;
     RackResolver.init(systemConf);
 
+    serviceTracker = ServiceTrackerFactory.get(systemConf);
+
     this.workerContext = new WorkerContext();
     this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
@@ -352,8 +356,8 @@ public class TajoWorker extends CompositeService {
 
     tajoMasterInfo = new TajoMasterInfo();
     if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-      tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+      tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
+      tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress());
     } else {
       tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
           .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
@@ -425,18 +429,14 @@ public class TajoWorker extends CompositeService {
       return systemConf;
     }
 
-    public TajoWorkerManagerService getTajoWorkerManagerService() {
-      return tajoWorkerManagerService;
+    public ServiceTracker getServiceTracker() {
+      return serviceTracker;
     }
 
     public QueryMasterManagerService getQueryMasterManagerService() {
       return queryMasterManagerService;
     }
 
-    public TajoWorkerClientService getTajoWorkerClientService() {
-      return tajoWorkerClientService;
-    }
-
     public TaskRunnerManager getTaskRunnerManager() {
       return taskRunnerManager;
     }
@@ -521,10 +521,6 @@ public class TajoWorker extends CompositeService {
       TajoWorker.this.numClusterNodes.set(numClusterNodes);
     }
 
-    public int getNumClusterNodes() {
-      return TajoWorker.this.numClusterNodes.get();
-    }
-
     public void setClusterResource(ClusterResourceSummary clusterResource) {
       synchronized (numClusterNodes) {
         TajoWorker.this.clusterResource = clusterResource;
@@ -537,26 +533,6 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    public InetSocketAddress getTajoMasterAddress() {
-      return tajoMasterInfo.getTajoMasterAddress();
-    }
-
-    public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
-      tajoMasterInfo.setTajoMasterAddress(tajoMasterAddress);
-    }
-
-    public InetSocketAddress getResourceTrackerAddress() {
-      return tajoMasterInfo.getWorkerResourceTrackerAddr();
-    }
-
-    public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
-      tajoMasterInfo.setWorkerResourceTrackerAddr(workerResourceTrackerAddr);
-    }
-
-    public int getPeerRpcPort() {
-      return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
-    }
-
     public boolean isQueryMasterMode() {
       return queryMasterMode;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 0b815d8..c0a6453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -119,7 +119,7 @@ public class TajoWorkerClientService extends AbstractService {
         QueryId queryId = new QueryId(request.getQueryId());
 
         QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
-        QueryHistory queryHistory;
+        QueryHistory queryHistory = null;
         if (queryMasterTask == null) {
           queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString());
         } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 676c72b..870e9a0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -35,6 +35,7 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
 import org.apache.tajo.storage.DiskUtil;
@@ -183,22 +184,9 @@ public class WorkerHeartbeatService extends AbstractService {
         try {
           CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
 
-          // In TajoMaster HA mode, if backup master be active status,
-          // worker may fail to connect existing active master. Thus,
-          // if worker can't connect the master, worker should try to connect another master and
-          // update master address in worker context.
-          if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-            try {
-              rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
-            } catch (Exception e) {
-              context.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
-              context.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-              rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
-            }
-          } else {
-            rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
-          }
-
+          ServiceTracker serviceTracker = context.getServiceTracker();
+          rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(),
+              TajoResourceTrackerProtocol.class, true);
           TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 9d9f39c..4b76c73 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -26,6 +26,8 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rule.*;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
@@ -45,13 +47,8 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
     InetSocketAddress masterAddress = null;
     
     try {
-      if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        masterAddress = HAServiceUtil.getMasterUmbilicalAddress(tajoConf);
-      } else {
-        masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
-      }
-      masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
-      
+      ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf);
+      masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
       masterClient.getStub();
     } finally {
       if (masterClient != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index bc770d7..1ff81a6 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -24,12 +24,13 @@
 <%@ page import="org.apache.tajo.catalog.TableDesc" %>
 <%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 <%@ page import="org.apache.tajo.util.FileUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.util.Collection" %>
 <%@ page import="java.util.List" %>
 <%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
   CatalogService catalog = master.getCatalog();
@@ -59,7 +60,7 @@
   //TODO filter with database
   Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase);
 
-  HAService haService = master.getContext().getHAService();
+  ServiceTracker haService = master.getContext().getHAService();
   String activeLabel = "";
   if (haService != null) {
     if (haService.isActiveStatus()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 1fb5e40..aca1153 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -21,8 +21,8 @@
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
-<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
@@ -30,6 +30,7 @@
 <%@ page import="org.apache.tajo.util.TUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -75,7 +76,7 @@
   String deadWorkersHtml = deadWorkers.isEmpty() ? "0": "<font color='red'>" + deadWorkers.size() + "</font>";
   String deadQueryMastersHtml = deadQueryMasters.isEmpty() ? "0": "<font color='red'>" + deadQueryMasters.size() + "</font>";
 
-  HAService haService = master.getContext().getHAService();
+  ServiceTracker haService = master.getContext().getHAService();
   List<TajoMasterInfo> masters = TUtil.newList();
 
   String activeLabel = "";

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index d98abfd..0a0558e 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,8 +23,8 @@
 <%@ page import="org.apache.tajo.conf.TajoConf" %>
 <%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
-<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
 <%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
@@ -35,6 +35,7 @@
 <%@ page import="java.util.Collection" %>
 <%@ page import="java.util.Date" %>
 <%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -85,7 +86,7 @@
   String numDeadWorkersHtml = numDeadWorkers == 0 ? "0" : "<font color='red'>" + numDeadWorkers + "</font>";
   String numDeadQueryMastersHtml = numDeadQueryMasters == 0 ? "0" : "<font color='red'>" + numDeadQueryMasters + "</font>";
 
-  HAService haService = master.getContext().getHAService();
+  ServiceTracker haService = master.getContext().getHAService();
   List<TajoMasterInfo> masters = TUtil.newList();
 
   String activeLabel = "";

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index 82836ac..a0f9a0a 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -19,13 +19,14 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="javax.xml.ws.Service" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
 
-  HAService haService = master.getContext().getHAService();
+  ServiceTracker haService = master.getContext().getHAService();
   String activeLabel = "";
   if (haService != null) {
       if (haService.isActiveStatus()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index a6f9f74..f8642ed 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.ha;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
@@ -29,6 +27,8 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertEquals;
@@ -37,8 +37,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestHAServiceHDFSImpl  {
-  private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class);
-
   private TajoTestingCluster cluster;
   private TajoMaster backupMaster;
 
@@ -60,7 +58,8 @@ public class TestHAServiceHDFSImpl  {
     try {
       FileSystem fs = cluster.getDefaultFileSystem();
 
-      masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0];
+      ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf);
+      masterAddress = serviceTracker.getUmbilicalAddress().getHostName();
 
       setConfiguration();
 
@@ -112,6 +111,7 @@ public class TestHAServiceHDFSImpl  {
       masterAddress + ":" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
       masterAddress + ":" + NetUtils.getFreeSocketPort());
+
     conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
 
     //Client API service RPC Server

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index f00dc25..2264b62 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -23,12 +23,11 @@ import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.TajoConstants;
-import org.apache.tajo.client.CatalogAdminClient;
-import org.apache.tajo.client.QueryClient;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.jdbc.util.QueryStringDecoder;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
 import java.net.URI;
@@ -109,7 +108,8 @@ public class JdbcConnection implements Connection {
     }
 
     try {
-      tajoClient = new TajoClientImpl(hostName, port, databaseName);
+      ServiceTracker serviceTracker = new DummyServiceTracker(NetUtils.createSocketAddr(hostName, port));
+      tajoClient = new TajoClientImpl(tajoConf, serviceTracker, databaseName);
     } catch (Exception e) {
       throw new SQLException("Cannot create TajoClient instance:" + e.getMessage(), "TAJO-002");
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index eb7f8c9..7f89b46 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -118,7 +118,7 @@ public class TajoStatement implements Statement {
   @Override
   public ResultSet executeQuery(String sql) throws SQLException {
     if (isClosed) {
-      throw new SQLFeatureNotSupportedException("Can't execute after statement has been closed");
+      throw new SQLException("Can't execute after statement has been closed");
     }
 
     try {
@@ -130,7 +130,7 @@ public class TajoStatement implements Statement {
         return tajoClient.executeQueryAndGetResult(sql);
       }
     } catch (Exception e) {
-      throw new SQLFeatureNotSupportedException(e.getMessage(), e);
+      throw new SQLException(e.getMessage(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index af3d623..68e96d8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -38,14 +38,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class StorageUtil extends StorageConstants {
-  public static int getRowByteSize(Schema schema) {
-    int sum = 0;
-    for(Column col : schema.getColumns()) {
-      sum += StorageUtil.getColByteSize(col);
-    }
-
-    return sum;
-  }
 
   public static int getColByteSize(Column col) {
     switch (col.getDataType().getType()) {
@@ -83,14 +75,6 @@ public class StorageUtil extends StorageConstants {
         return 0;
     }
   }
-
-  public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
-    FileSystem fs = tableroot.getFileSystem(conf);
-    FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
-    FileUtil.writeProto(out, meta.getProto());
-    out.flush();
-    out.close();
-  }
   
   public static Path concatPath(String parent, String...childs) {
     return concatPath(new Path(parent), childs);


[10/10] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/31698f53
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/31698f53
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/31698f53

Branch: refs/heads/index_support
Commit: 31698f537f8c19ee0fc16968add6e29229ddc2d0
Parents: 255046b 58bbb1b
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Feb 2 21:28:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Feb 2 21:28:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  9 ++
 .../apache/tajo/catalog/CatalogConstants.java   |  2 +
 .../org/apache/tajo/cli/tools/TajoDump.java     | 16 +++-
 .../cli/tsql/DefaultTajoCliOutputFormatter.java |  3 +-
 .../org/apache/tajo/master/QueryInProgress.java |  2 +-
 .../org/apache/tajo/master/QueryManager.java    |  4 -
 .../tajo/master/TajoMasterClientService.java    |  8 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  1 -
 .../java/org/apache/tajo/querymaster/Stage.java | 23 ++++-
 .../main/proto/QueryCoordinatorProtocol.proto   |  1 -
 .../org/apache/tajo/TajoTestingCluster.java     | 96 ++++++++------------
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   | 42 +++------
 .../tajo/engine/query/TestInsertQuery.java      |  3 +-
 .../apache/tajo/querymaster/TestKillQuery.java  | 20 +++-
 .../TestTajoCli/testNonForwardQueryPause.result |  3 +-
 .../org/apache/tajo/storage/StorageManager.java | 40 ++++----
 .../tajo/storage/TestFileStorageManager.java    | 60 +++++++++---
 17 files changed, 192 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index c7df801,a8c5c9b..f19f77f
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@@ -52,6 -52,7 +52,8 @@@ public class CatalogConstants 
    public static final String COL_TABLESPACE_PK = "SPACE_ID";
    public static final String COL_DATABASES_PK = "DB_ID";
    public static final String COL_TABLES_PK = "TID";
 +  public static final String COL_INDEXES_PK = "INDEX_ID";
    public static final String COL_TABLES_NAME = "TABLE_NAME";
+   
+   public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema";
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------


[07/10] tajo git commit: TAJO-1323: Cleanup the unstable test case. (jinho)

Posted by ji...@apache.org.
TAJO-1323: Cleanup the unstable test case. (jinho)

Closes #368


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fd73074f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fd73074f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fd73074f

Branch: refs/heads/index_support
Commit: fd73074f2198f92516a3eb8f5e786d31c1c071a5
Parents: e656ee2
Author: jhkim <jh...@apache.org>
Authored: Mon Feb 2 14:56:14 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Feb 2 14:56:14 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../cli/tsql/DefaultTajoCliOutputFormatter.java |  3 +-
 .../java/org/apache/tajo/querymaster/Stage.java | 23 ++++-
 .../org/apache/tajo/TajoTestingCluster.java     | 96 ++++++++------------
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   | 42 +++------
 .../tajo/engine/query/TestInsertQuery.java      |  3 +-
 .../apache/tajo/querymaster/TestKillQuery.java  | 20 +++-
 .../TestTajoCli/testNonForwardQueryPause.result |  3 +-
 8 files changed, 98 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4a3715b..8466a38 100644
--- a/CHANGES
+++ b/CHANGES
@@ -327,6 +327,8 @@ Release 0.10.0 - unreleased
 
   TASKS
 
+    TAJO-1323: Cleanup the unstable test case. (jinho)
+
     TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
index 17c94b9..5cbe77b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
@@ -37,6 +37,7 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
   private boolean printPause;
   private boolean printErrorTrace;
   private String nullChar;
+  public static char QUIT_COMMAND = 'q';
 
   @Override
   public void init(TajoCli.TajoCliContext context) {
@@ -123,7 +124,7 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
         }
         sout.flush();
         if (sin != null) {
-          if (sin.read() == 'q') {
+          if (sin.read() == QUIT_COMMAND) {
             endOfTuple = false;
             sout.println();
             break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 208d4a6..5673d5b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -185,7 +185,7 @@ public class Stage implements EventHandler<StageEvent> {
 
           // Transitions from KILL_WAIT state
           .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
+              EnumSet.of(StageEventType.SQ_START, StageEventType.SQ_CONTAINER_ALLOCATED),
               CONTAINERS_CANCEL_TRANSITION)
           .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
               EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
@@ -414,6 +414,18 @@ public class Stage implements EventHandler<StageEvent> {
     return totalScheduledObjectsCount;
   }
 
+  public int getKilledObjectCount() {
+    return killedObjectCount;
+  }
+
+  public int getFailedObjectCount() {
+    return failedObjectCount;
+  }
+
+  public int getCompletedTaskCount() {
+    return completedTaskCount;
+  }
+
   public ExecutionBlock getBlock() {
     return block;
   }
@@ -793,7 +805,14 @@ public class Stage implements EventHandler<StageEvent> {
                                 stage.taskScheduler.start();
                                 allocateContainers(stage);
                               } else {
-                                stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+                                /* all tasks are killed before stage are inited */
+                                if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) {
+                                  stage.eventHandler.handle(
+                                      new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+                                } else {
+                                  stage.eventHandler.handle(
+                                      new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+                                }
                               }
                             }
                           } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 8714fc4..0d3d660 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -77,17 +77,13 @@ public class TajoTestingCluster {
   private TajoMaster tajoMaster;
   private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
   private boolean standbyWorkerMode = false;
+  private boolean isDFSRunning = false;
+  private boolean isTajoClusterRunning = false;
+  private boolean isCatalogServerRunning = false;
 
-	// If non-null, then already a cluster running.
 	private File clusterTestBuildDir = null;
 
 	/**
-	 * System property key to get test directory value.
-	 * Name is as it is because mini dfs has hard-codings to put test data here.
-	 */
-	public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
-
-	/**
 	 * Default parent directory for test output.
 	 */
 	public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
@@ -111,6 +107,7 @@ public class TajoTestingCluster {
     this.conf = new TajoConf();
     this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
 
+    initTestDir();
     setTestingFlagProperties();
     initPropertiesAndConfigs();
   }
@@ -177,22 +174,19 @@ public class TajoTestingCluster {
 		return this.conf;
 	}
 
-	public void initTestDir() {
-		if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
-			clusterTestBuildDir = setupClusterTestBuildDir();
-			System.setProperty(TEST_DIRECTORY_KEY,
-          clusterTestBuildDir.getAbsolutePath());
-		}
-	}
+  public void initTestDir() {
+    if (clusterTestBuildDir == null) {
+      clusterTestBuildDir = setupClusterTestBuildDir();
+    }
+  }
 
 	/**
 	 * @return Where to write test data on local filesystem; usually
 	 * {@link #DEFAULT_TEST_DIRECTORY}
 	 * @see #setupClusterTestBuildDir()
 	 */
-	public static File getTestDir() {
-		return new File(System.getProperty(TEST_DIRECTORY_KEY,
-			DEFAULT_TEST_DIRECTORY));
+	public File getTestDir() {
+		return clusterTestBuildDir;
 	}
 
 	/**
@@ -202,10 +196,10 @@ public class TajoTestingCluster {
 	 * @see #setupClusterTestBuildDir()
 	 */
 	public static File getTestDir(final String subdirName) {
-		return new File(getTestDir(), subdirName);
+		return new File(new File(DEFAULT_TEST_DIRECTORY), subdirName);
   }
 
-	public File setupClusterTestBuildDir() {
+	public static File setupClusterTestBuildDir() {
 		String randomStr = UUID.randomUUID().toString();
 		String dirStr = getTestDir(randomStr).toString();
 		File dir = new File(dirStr).getAbsoluteFile();
@@ -243,9 +237,6 @@ public class TajoTestingCluster {
                                             File dir,
                                             final String hosts[])
       throws IOException {
-    if (dir == null) {
-      dir = setupClusterTestBuildDir();
-    }
 
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.toString());
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
@@ -263,7 +254,7 @@ public class TajoTestingCluster {
     this.defaultFS = this.dfsCluster.getFileSystem();
     this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
     this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
-
+    isDFSRunning = true;
     return this.dfsCluster;
   }
 
@@ -300,22 +291,20 @@ public class TajoTestingCluster {
   // Catalog Section
   ////////////////////////////////////////////////////////
   public MiniCatalogServer startCatalogCluster() throws Exception {
-    TajoConf c = getConfiguration();
+    if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running");
 
-    if(clusterTestBuildDir == null) {
-      clusterTestBuildDir = setupClusterTestBuildDir();
-    }
+    TajoConf c = getConfiguration();
 
     conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
     conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
-    LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+    LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
     conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
 
     catalogServer = new MiniCatalogServer(conf);
     CatalogServer catServer = catalogServer.getCatalogServer();
     InetSocketAddress sockAddr = catServer.getBindAddress();
     c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
-
+    isCatalogServerRunning = true;
     return this.catalogServer;
   }
 
@@ -323,6 +312,7 @@ public class TajoTestingCluster {
     if (catalogServer != null) {
       this.catalogServer.shutdown();
     }
+    isCatalogServerRunning = false;
   }
 
   public MiniCatalogServer getMiniCatalogCluster() {
@@ -352,10 +342,10 @@ public class TajoTestingCluster {
       c.setVar(ConfVars.ROOT_DIR,
           getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
     } else {
-      c.setVar(ConfVars.ROOT_DIR, clusterTestBuildDir.getAbsolutePath() + "/tajo");
+      c.setVar(ConfVars.ROOT_DIR, testBuildDir.getAbsolutePath() + "/tajo");
     }
 
-    setupCatalogForTesting(c, clusterTestBuildDir);
+    setupCatalogForTesting(c, testBuildDir);
 
     tajoMaster = new TajoMaster();
     tajoMaster.init(c);
@@ -374,6 +364,7 @@ public class TajoTestingCluster {
     if(standbyWorkerMode) {
       startTajoWorkers(numSlaves);
     }
+    isTajoClusterRunning = true;
     LOG.info("Mini Tajo cluster is up");
     LOG.info("====================================================================================");
     LOG.info("=                           MiniTajoCluster starts up                              =");
@@ -473,8 +464,8 @@ public class TajoTestingCluster {
   /**
    * @throws java.io.IOException If a cluster -- dfs or engine -- already running.
    */
-  void isRunningCluster(String passedBuildPath) throws IOException {
-    if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
+  void isRunningCluster() throws IOException {
+    if (!isTajoClusterRunning && !isCatalogServerRunning && !isDFSRunning) return;
     throw new IOException("Cluster already running at " +
         this.clusterTestBuildDir);
   }
@@ -501,19 +492,13 @@ public class TajoTestingCluster {
     LOG.info("Starting up minicluster with 1 master(s) and " +
         numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
 
-    // If we already put up a cluster, fail.
-    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
-    isRunningCluster(testBuildPath);
-    if (testBuildPath != null) {
-      LOG.info("Using passed path: " + testBuildPath);
+    // If we already bring up the cluster, fail.
+    isRunningCluster();
+    if (clusterTestBuildDir != null) {
+      LOG.info("Using passed path: " + clusterTestBuildDir);
     }
 
-    // Make a new random dir to home everything in.  Set it as system property.
-    // minidfs reads home from system property.
-    this.clusterTestBuildDir = testBuildPath == null?
-        setupClusterTestBuildDir() : new File(testBuildPath);
-
-    startMiniDFSCluster(numDataNodes, setupClusterTestBuildDir(), dataNodeHosts);
+    startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
 
     hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
@@ -559,20 +544,11 @@ public class TajoTestingCluster {
   }
 
   public void startMiniClusterInLocal(final int numSlaves) throws Exception {
-    // If we already put up a cluster, fail.
-    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
-    isRunningCluster(testBuildPath);
-    if (testBuildPath != null) {
-      LOG.info("Using passed path: " + testBuildPath);
-    }
-
-    // Make a new random dir to home everything in.  Set it as system property.
-    // minidfs reads home from system property.
-    this.clusterTestBuildDir = testBuildPath == null?
-        setupClusterTestBuildDir() : new File(testBuildPath);
+    isRunningCluster();
 
-    System.setProperty(TEST_DIRECTORY_KEY,
-        this.clusterTestBuildDir.getAbsolutePath());
+    if (clusterTestBuildDir != null) {
+      LOG.info("Using passed path: " + clusterTestBuildDir);
+    }
 
     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
   }
@@ -592,6 +568,7 @@ public class TajoTestingCluster {
 
     if(this.catalogServer != null) {
       shutdownCatalogCluster();
+      isCatalogServerRunning = false;
     }
 
     if(this.yarnCluster != null) {
@@ -612,6 +589,7 @@ public class TajoTestingCluster {
       } catch (IOException e) {
         System.err.println("error closing file system: " + e);
       }
+      isDFSRunning = false;
     }
 
     if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
@@ -630,6 +608,7 @@ public class TajoTestingCluster {
     }
 
     LOG.info("Minicluster is down");
+    isTajoClusterRunning = false;
   }
 
   public static TajoClient newTajoClient() throws Exception {
@@ -803,7 +782,8 @@ public class TajoTestingCluster {
       } catch (InterruptedException e) {
       }
       if (++i > 200) {
-        throw new IOException("Timed out waiting");
+        throw new IOException("Timed out waiting. expected: " + expected +
+            ", actual: " + query != null ? String.valueOf(query.getSynchronizedState()) : String.valueOf(query));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index aff1677..e014b52 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -38,9 +38,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintWriter;
+import java.io.*;
 import java.net.URL;
 
 import static org.junit.Assert.*;
@@ -77,7 +75,8 @@ public class TestTajoCli {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
+    out.close();
     if (tajoCli != null) {
       tajoCli.close();
     }
@@ -350,38 +349,27 @@ public class TestTajoCli {
     assertOutputResult(new String(out.toByteArray()));
   }
 
-  @Test
+  @Test(timeout = 3000)
   public void testNonForwardQueryPause() throws Exception {
     final String sql = "select * from default.lineitem";
+    TajoCli cli = null;
     try {
       TableDesc tableDesc = cluster.getMaster().getCatalog().getTableDesc("default", "lineitem");
       assertNotNull(tableDesc);
       assertEquals(0L, tableDesc.getStats().getNumRows().longValue());
-      setVar(tajoCli, SessionVars.CLI_PAGE_ROWS, "2");
-      setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
-      Thread t = new Thread() {
-        public void run() {
-          try {
-            tajoCli.executeScript(sql);
-          } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-          }
-        }
-      };
-      t.start();
+
+      InputStream testInput = new ByteArrayInputStream(new byte[]{(byte) DefaultTajoCliOutputFormatter.QUIT_COMMAND});
+      cli = new TajoCli(cluster.getConfiguration(), new String[]{}, testInput, out);
+      setVar(cli, SessionVars.CLI_PAGE_ROWS, "2");
+      setVar(cli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
+
+      cli.executeScript(sql);
+
       String consoleResult;
-      while (true) {
-        Thread.sleep(3 * 1000);
-        consoleResult = new String(out.toByteArray());
-        if (consoleResult.indexOf("row") >= 0) {
-          t.interrupt();
-          break;
-        }
-      }
+      consoleResult = new String(out.toByteArray());
       assertOutputResult(consoleResult);
     } finally {
-      setVar(tajoCli, SessionVars.CLI_PAGE_ROWS, "100");
+      cli.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index cc7dced..0799d22 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -660,12 +660,11 @@ public class TestInsertQuery extends QueryTestCaseBase {
 
   @Test
   public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
-    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery2");
     ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
     res.close();
     CatalogService catalog = testingCluster.getMaster().getCatalog();
     assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
-
     res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
     res.close();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 42ad8da..0574bea 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -127,12 +127,24 @@ public class TestKillQuery {
     Query q = queryMasterTask.getQuery();
     q.handle(new QueryEvent(queryId, QueryEventType.KILL));
 
-    try{
+    try {
       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
-    } finally {
       assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      if (stage != null) {
+        System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)",
+            stage.getId().toString(),
+            stage.getTotalScheduledObjectsCount(),
+            stage.getCompletedTaskCount(),
+            stage.getSucceededObjectCount(),
+            stage.getKilledObjectCount(),
+            stage.getFailedObjectCount()));
+      }
+      throw e;
+    } finally {
+      queryMasterTask.stop();
     }
-    queryMasterTask.stop();
   }
 
   @Test
@@ -145,6 +157,8 @@ public class TestKillQuery {
     QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
     Query query = qmt.getQuery();
 
+    // wait for a stage created
+    cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
     query.handle(new QueryEvent(queryId, QueryEventType.KILL));
 
     try{

http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result b/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
index e9485d0..d4ba604 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
@@ -2,4 +2,5 @@ l_orderkey,  l_partkey,  l_suppkey,  l_linenumber,  l_quantity,  l_extendedprice
 -------------------------------
 1,  1,  7706,  1,  17.0,  21168.23,  0.04,  0.02,  N,  O,  1996-03-13,  1996-02-12,  1996-03-22,  DELIVER IN PERSON,  TRUCK,  egular courts above the
 1,  1,  7311,  2,  36.0,  45983.16,  0.09,  0.06,  N,  O,  1996-04-12,  1996-02-28,  1996-04-20,  TAKE BACK RETURN,  MAIL,  ly final dependencies: slyly bold 
-(2 rows, continue... 'q' is quit)
\ No newline at end of file
+(2 rows, continue... 'q' is quit)
+(unknown row number, , 604 B selected)
\ No newline at end of file


[04/10] tajo git commit: TAJO-1306: HAServiceUtil should not directly use HDFS.

Posted by ji...@apache.org.
TAJO-1306: HAServiceUtil should not directly use HDFS.

Closes #358


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4595375f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4595375f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4595375f

Branch: refs/heads/index_support
Commit: 4595375f7e6b62436e0d4bf88a8aef1ca680c726
Parents: 015913b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 28 09:23:20 2015 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 28 09:23:20 2015 -0800

----------------------------------------------------------------------
 .../tajo/catalog/AbstractCatalogClient.java     |  12 +-
 .../apache/tajo/client/DummyServiceTracker.java |  84 +++
 .../apache/tajo/client/SessionConnection.java   |  44 +-
 .../org/apache/tajo/client/TajoClientImpl.java  |  38 +-
 .../apache/tajo/client/TajoHAClientUtil.java    |  14 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   7 +-
 .../java/org/apache/tajo/ha/HAServiceUtil.java  |  39 --
 .../apache/tajo/service/BaseServiceTracker.java |  97 ++++
 .../apache/tajo/service/HAServiceTracker.java   |  48 ++
 .../org/apache/tajo/service/ServiceTracker.java |  63 ++
 .../tajo/service/ServiceTrackerException.java   |  30 +
 .../tajo/service/ServiceTrackerFactory.java     |  41 ++
 .../org/apache/tajo/service/TajoMasterInfo.java |  89 +++
 .../org/apache/tajo/benchmark/BenchmarkSet.java |  15 +-
 .../main/java/org/apache/tajo/ha/HAService.java |  56 --
 .../org/apache/tajo/ha/HAServiceHDFSImpl.java   | 316 ----------
 .../org/apache/tajo/ha/HdfsServiceTracker.java  | 576 +++++++++++++++++++
 .../java/org/apache/tajo/ha/TajoMasterInfo.java |  89 ---
 .../apache/tajo/master/TajoContainerProxy.java  |  27 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  30 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  66 +--
 .../main/java/org/apache/tajo/util/JSPUtil.java |  12 +-
 .../tajo/worker/TajoResourceAllocator.java      |  28 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  46 +-
 .../tajo/worker/TajoWorkerClientService.java    |   2 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  20 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  11 +-
 .../resources/webapps/admin/catalogview.jsp     |   5 +-
 .../main/resources/webapps/admin/cluster.jsp    |   7 +-
 .../src/main/resources/webapps/admin/index.jsp  |   7 +-
 .../resources/webapps/admin/query_executor.jsp  |   5 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  10 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  10 +-
 .../org/apache/tajo/jdbc/TajoStatement.java     |   4 +-
 .../org/apache/tajo/storage/StorageUtil.java    |  16 -
 35 files changed, 1156 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1a2fd44..718f7d6 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,12 +29,13 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
@@ -48,6 +49,7 @@ import java.util.List;
 public abstract class AbstractCatalogClient implements CatalogService {
   private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
 
+  protected ServiceTracker serviceTracker;
   protected RpcConnectionPool pool;
   protected InetSocketAddress catalogServerAddr;
   protected TajoConf conf;
@@ -57,6 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
     this.pool = RpcConnectionPool.getPool();
     this.catalogServerAddr = catalogServerAddr;
+    this.serviceTracker = ServiceTrackerFactory.get(conf);
     this.conf = conf;
   }
 
@@ -64,14 +67,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
     if (catalogServerAddr == null) {
       return null;
     } else {
+
       if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         return catalogServerAddr;
       } else {
-        if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
-          return HAServiceUtil.getCatalogAddress(conf);
-        } else {
-          return catalogServerAddr;
-        }
+        return serviceTracker.getCatalogAddress();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
new file mode 100644
index 0000000..762c2e7
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class DummyServiceTracker implements ServiceTracker {
+  private InetSocketAddress address;
+
+  public DummyServiceTracker(InetSocketAddress address) {
+    this.address = address;
+  }
+
+  @Override
+  public boolean isHighAvailable() {
+    return false;
+  }
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    return address;
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void register() throws IOException {
+  }
+
+  @Override
+  public void delete() throws IOException {
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return true;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    throw new UnsupportedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 5490be4..3e2b9cc 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,12 +21,10 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -34,6 +32,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
@@ -58,8 +57,6 @@ public class SessionConnection implements Closeable {
 
   private final TajoConf conf;
 
-  final InetSocketAddress tajoMasterAddr;
-
   final RpcConnectionPool connPool;
 
   private final String baseDatabase;
@@ -73,41 +70,29 @@ public class SessionConnection implements Closeable {
   /** session variable cache */
   private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
 
-
-  public SessionConnection(TajoConf conf) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
-  }
-
-  public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
-  }
-
-  public SessionConnection(InetSocketAddress addr) throws IOException {
-    this(new TajoConf(), addr, null);
-  }
-
-  public SessionConnection(String hostname, int port, String baseDatabase) throws IOException {
-    this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
-  }
+  private ServiceTracker serviceTracker;
 
   /**
    * Connect to TajoMaster
    *
    * @param conf TajoConf
-   * @param addr TajoMaster address
+   * @param tracker TajoMaster address
    * @param baseDatabase The base database name. It is case sensitive. If it is null,
    *                     the 'default' database will be used.
    * @throws java.io.IOException
    */
-  public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+  public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase)
+      throws IOException {
+
     this.conf = conf;
     this.conf.set("tajo.disk.scheduler.report.interval", "0");
-    this.tajoMasterAddr = addr;
     int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
     // Don't share connection pool per client
     connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
     userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
+
+    this.serviceTracker = tracker;
   }
 
   public Map<String, String> getClientSideSessionVars() {
@@ -140,7 +125,8 @@ public class SessionConnection implements Closeable {
   public boolean isConnected() {
     if(!closed.get()){
       try {
-        return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+        return connPool.getConnection(serviceTracker.getClientServiceAddress(),
+            TajoMasterClientProtocol.class, false).isConnected();
       } catch (Throwable e) {
         return false;
       }
@@ -309,15 +295,7 @@ public class SessionConnection implements Closeable {
   }
 
   protected InetSocketAddress getTajoMasterAddr() {
-    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      return tajoMasterAddr;
-    } else {
-      if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
-        return HAServiceUtil.getMasterClientAddress(conf);
-      } else {
-        return tajoMasterAddr;
-      }
-    }
+    return serviceTracker.getClientServiceAddress();
   }
 
   protected void checkSessionAndGet(NettyClientBase client) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 8eafc91..f8eef28 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -32,8 +32,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
@@ -41,7 +39,8 @@ import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -56,41 +55,30 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
   QueryClient queryClient;
   CatalogAdminClient catalogClient;
 
-  public TajoClientImpl(TajoConf conf) throws IOException {
-    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null);
-  }
-
-  public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase);
-  }
-
-  public TajoClientImpl(InetSocketAddress addr) throws IOException {
-    this(new TajoConf(), addr, null);
-  }
-
   /**
    * Connect to TajoMaster
    *
    * @param conf TajoConf
-   * @param addr TajoMaster address
+   * @param tracker ServiceTracker to discovery Tajo Client RPC
    * @param baseDatabase The base database name. It is case sensitive. If it is null,
    *                     the 'default' database will be used.
    * @throws java.io.IOException
    */
-  public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
-    super(conf, addr, baseDatabase);
+  public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException {
+    super(conf, tracker, baseDatabase);
+
     this.queryClient = new QueryClientImpl(this);
     this.catalogClient = new CatalogAdminClientImpl(this);
-    
+
     diagnoseTajoClient();
   }
 
-  public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
-    super(hostName, port, baseDatabase);
-    this.queryClient = new QueryClientImpl(this);
-    this.catalogClient = new CatalogAdminClientImpl(this);
-    
-    diagnoseTajoClient();
+  public TajoClientImpl(TajoConf conf) throws IOException {
+    this(conf, ServiceTrackerFactory.get(conf), null);
+  }
+
+  public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+    this(conf, ServiceTrackerFactory.get(conf), baseDatabase);
   }
   
   private void diagnoseTajoClient() throws EvaluationFailedException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index 12a9ec8..7267b10 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -40,10 +40,8 @@ import com.google.protobuf.ServiceException;
 import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 public class TajoHAClientUtil {
   /**
@@ -65,6 +63,7 @@ public class TajoHAClientUtil {
       TajoCliContext context) throws IOException, ServiceException {
 
     if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
       if (!HAServiceUtil.isMasterAlive(conf.getVar(
         TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
         TajoClient tajoClient = null;
@@ -85,15 +84,4 @@ public class TajoHAClientUtil {
       return client;
     }
   }
-
-
-  public static InetSocketAddress getRpcClientAddress(TajoConf conf) {
-    if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf));
-    } else {
-      return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
-        .TAJO_MASTER_CLIENT_RPC_ADDRESS));
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1bb96bc..fe5ff54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ConfigKey;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.service.BaseServiceTracker;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.NumberUtil;
 import org.apache.tajo.util.TUtil;
@@ -134,10 +135,14 @@ public class TajoConf extends Configuration {
         Validators.networkAddr()),
     TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
 
-    // Tajo Master HA Configurations
+    // High availability configurations
     TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
     TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
 
+    // Service discovery
+    DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
+    HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
+
     // Resource tracker service
     RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
         Validators.networkAddr()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
index 52c2ade..7001228 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.ha;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
@@ -34,15 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class HAServiceUtil {
-  private static Log LOG = LogFactory.getLog(HAServiceUtil.class);
-
-  public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
-  }
-
-  public static String getMasterUmbilicalName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
-  }
 
   public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
     return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
@@ -52,30 +41,6 @@ public class HAServiceUtil {
     return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
   }
 
-  public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
-  }
-
-  public static String getResourceTrackerName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
-  }
-
-  public static InetSocketAddress getCatalogAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS);
-  }
-
-  public static String getCatalogName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
-  }
-
-  public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
-    return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS);
-  }
-
-  public static String getMasterInfoName(TajoConf conf) {
-    return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
-  }
-
   public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
     InetSocketAddress masterAddress = null;
 
@@ -153,10 +118,6 @@ public class HAServiceUtil {
     return masterAddress;
   }
 
-  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
-    return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
-  }
-
   public static boolean isMasterAlive(String masterName, TajoConf conf) {
     boolean isAlive = true;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
new file mode 100644
index 0000000..bf7fd2c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class BaseServiceTracker implements ServiceTracker {
+  private final TajoConf conf;
+  private TajoMasterInfo tajoMasterInfo;
+  private List<TajoMasterInfo> tajoMasterInfos;
+
+  @SuppressWarnings("unused")
+  public BaseServiceTracker(TajoConf conf) {
+    this.conf = conf;
+
+    tajoMasterInfo = new TajoMasterInfo();
+    tajoMasterInfo.setActive(true);
+    tajoMasterInfo.setAvailable(true);
+    tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+    tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+    tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+    tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS));
+    tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS));
+
+    tajoMasterInfos = TUtil.newList(tajoMasterInfo);
+  }
+
+  @Override
+  public boolean isHighAvailable() {
+    return false;
+  }
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    return tajoMasterInfo.getTajoMasterAddress();
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    return tajoMasterInfo.getTajoClientAddress();
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    return tajoMasterInfo.getWorkerResourceTrackerAddr();
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    return tajoMasterInfo.getCatalogAddress();
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    return tajoMasterInfo.getWebServerAddress();
+  }
+
+  @Override
+  public void register() throws IOException {
+  }
+
+  @Override
+  public void delete() throws IOException {
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return true;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    return tajoMasterInfos;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
new file mode 100644
index 0000000..c808537
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public abstract class HAServiceTracker implements ServiceTracker {
+
+  static SocketFactory socketFactory = SocketFactory.getDefault();
+
+  public boolean isHighAvailable() {
+    return true;
+  }
+
+  public static boolean checkConnection(InetSocketAddress address) {
+    boolean isAlive = true;
+
+    try {
+      int connectionTimeout = 10;
+
+      Socket socket = socketFactory.createSocket();
+      NetUtils.connect(socket, address, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
new file mode 100644
index 0000000..73ff112
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public interface ServiceTracker {
+
+  public abstract boolean isHighAvailable();
+
+  public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
+
+  public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
+
+  /**
+   * Add master name to shared storage.
+   */
+  public void register() throws IOException;
+
+
+  /**
+   * Delete master name to shared storage.
+   *
+   */
+  public void delete() throws IOException;
+
+  /**
+   *
+   * @return True if current master is an active master.
+   */
+  public boolean isActiveStatus();
+
+  /**
+   *
+   * @return return all master list
+   * @throws IOException
+   */
+  public List<TajoMasterInfo> getMasters() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
new file mode 100644
index 0000000..3407c51
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+public class ServiceTrackerException extends RuntimeException {
+
+  public ServiceTrackerException(Throwable t) {
+    super(t);
+  }
+
+  public ServiceTrackerException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
new file mode 100644
index 0000000..5828055
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.ReflectionUtil;
+
+public class ServiceTrackerFactory {
+
+  public static ServiceTracker get(TajoConf conf) {
+    Class<ServiceTracker> trackerClass;
+
+    try {
+      if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.HA_SERVICE_TRACKER_CLASS);
+      } else {
+        trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.DEFAULT_SERVICE_TRACKER_CLASS);
+      }
+      return ReflectionUtil.newInstance(trackerClass, conf);
+
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
new file mode 100644
index 0000000..481b528
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+  private boolean available;
+  private boolean isActive;
+
+  private InetSocketAddress tajoMasterAddress;
+  private InetSocketAddress tajoClientAddress;
+  private InetSocketAddress workerResourceTrackerAddr;
+  private InetSocketAddress catalogAddress;
+  private InetSocketAddress webServerAddress;
+
+  public InetSocketAddress getTajoMasterAddress() {
+    return tajoMasterAddress;
+  }
+
+  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+    this.tajoMasterAddress = tajoMasterAddress;
+  }
+
+  public InetSocketAddress getTajoClientAddress() {
+    return tajoClientAddress;
+  }
+
+  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+    this.tajoClientAddress = tajoClientAddress;
+  }
+
+  public InetSocketAddress getWorkerResourceTrackerAddr() {
+    return workerResourceTrackerAddr;
+  }
+
+  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+  }
+
+  public InetSocketAddress getCatalogAddress() {
+    return catalogAddress;
+  }
+
+  public void setCatalogAddress(InetSocketAddress catalogAddress) {
+    this.catalogAddress = catalogAddress;
+  }
+
+  public InetSocketAddress getWebServerAddress() {
+    return webServerAddress;
+  }
+
+  public void setWebServerAddress(InetSocketAddress webServerAddress) {
+    this.webServerAddress = webServerAddress;
+  }
+
+  public boolean isAvailable() {
+    return available;
+  }
+
+  public void setAvailable(boolean available) {
+    this.available = available;
+  }
+
+  public boolean isActive() {
+    return isActive;
+  }
+
+  public void setActive(boolean active) {
+    isActive = active;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b1b6450..0304e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -18,19 +18,23 @@
 
 package org.apache.tajo.benchmark;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.store.MemStore;
+import org.apache.tajo.client.DummyServiceTracker;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,9 +47,14 @@ public abstract class BenchmarkSet {
 
   public void init(TajoConf conf, String dataDir) throws IOException {
     this.dataDir = dataDir;
-    if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
-      tajo = new TajoClientImpl(NetUtils.createSocketAddr(
-          System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
+
+    if (System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname) != null) {
+
+      String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname);
+      InetSocketAddress addr = NetUtils.createSocketAddr(addressStr);
+      ServiceTracker serviceTracker = new DummyServiceTracker(addr);
+      tajo = new TajoClientImpl(conf, serviceTracker, null);
+
     } else {
       conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
       tajo = new TajoClientImpl(conf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
deleted file mode 100644
index 1329223..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
-  /**
-   * Add master name to shared storage.
-   */
-  public void register() throws IOException;
-
-
-  /**
-   * Delete master name to shared storage.
-   *
-   */
-  public void delete() throws IOException;
-
-  /**
-   *
-   * @return True if current master is an active master.
-   */
-  public boolean isActiveStatus();
-
-  /**
-   *
-   * @return return all master list
-   * @throws IOException
-   */
-  public List<TajoMasterInfo> getMasters() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index e18a9b2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
-  private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
-  private MasterContext context;
-  private TajoConf conf;
-
-  private FileSystem fs;
-
-  private String masterName;
-  private Path rootPath;
-  private Path haPath;
-  private Path activePath;
-  private Path backupPath;
-
-  private boolean isActiveStatus = false;
-
-  //thread which runs periodically to see the last time since a heartbeat is received.
-  private Thread checkerThread;
-  private volatile boolean stopped = false;
-
-  private int monitorInterval;
-
-  private String currentActiveMaster;
-
-  public HAServiceHDFSImpl(MasterContext context) throws IOException {
-    this.context = context;
-    this.conf = context.getConf();
-    initSystemDirectory();
-
-    InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
-    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
-    monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
-  }
-
-  private void initSystemDirectory() throws IOException {
-    // Get Tajo root dir
-    this.rootPath = TajoConf.getTajoRootDir(conf);
-
-    // Check Tajo root dir
-    this.fs = rootPath.getFileSystem(conf);
-
-    // Check and create Tajo system HA dir
-    haPath = TajoConf.getSystemHADir(conf);
-    if (!fs.exists(haPath)) {
-      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA dir '" + haPath + "' is created");
-    }
-
-    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-    if (!fs.exists(activePath)) {
-      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Active dir '" + activePath + "' is created");
-    }
-
-    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-    if (!fs.exists(backupPath)) {
-      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Backup dir '" + backupPath + "' is created");
-    }
-  }
-
-  private void startPingChecker() {
-    if (checkerThread == null) {
-      checkerThread = new Thread(new PingChecker());
-      checkerThread.setName("Ping Checker");
-      checkerThread.start();
-    }
-  }
-
-  @Override
-  public void register() throws IOException {
-    FileStatus[] files = fs.listStatus(activePath);
-
-    // Phase 1: If there is not another active master, this try to become active master.
-    if (files.length == 0) {
-      createMasterFile(true);
-      currentActiveMaster = masterName;
-      LOG.info(String.format("This is added to active master (%s)", masterName));
-    } else {
-      // Phase 2: If there is active master information, we need to check its status.
-      Path activePath = files[0].getPath();
-      currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
-      // Phase 3: If current active master is dead, this master should be active master.
-      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
-        fs.delete(activePath, true);
-        createMasterFile(true);
-        currentActiveMaster = masterName;
-        LOG.info(String.format("This is added to active master (%s)", masterName));
-      } else {
-        // Phase 4: If current active master is alive, this master need to be backup master.
-        createMasterFile(false);
-        LOG.info(String.format("This is added to backup masters (%s)", masterName));
-      }
-    }
-  }
-
-  private void createMasterFile(boolean isActive) throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-    Path path = null;
-
-    if (isActive) {
-      path = new Path(activePath, fileName);
-    } else {
-      path = new Path(backupPath, fileName);
-    }
-
-    StringBuilder sb = new StringBuilder();
-    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
-    FSDataOutputStream out = fs.create(path);
-
-    try {
-      out.writeUTF(sb.toString());
-      out.hflush();
-      out.close();
-    } catch (FileAlreadyExistsException e) {
-      createMasterFile(false);
-    }
-
-    if (isActive) {
-      isActiveStatus = true;
-    } else {
-      isActiveStatus = false;
-    }
-
-    startPingChecker();
-  }
-
-
-  private InetSocketAddress getHostAddress(int type) {
-    InetSocketAddress address = null;
-
-    switch (type) {
-      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-        break;
-      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_CLIENT_RPC_ADDRESS);
-        break;
-      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .RESOURCE_TRACKER_RPC_ADDRESS);
-        break;
-      case HAConstants.CATALOG_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .CATALOG_ADDRESS);
-        break;
-      case HAConstants.MASTER_INFO_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-        .TAJO_MASTER_INFO_ADDRESS);
-      default:
-        break;
-    }
-
-    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
-  }
-
-  @Override
-  public void delete() throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-
-    Path activeFile = new Path(activePath, fileName);
-    if (fs.exists(activeFile)) {
-      fs.delete(activeFile, true);
-    }
-
-    Path backupFile = new Path(backupPath, fileName);
-    if (fs.exists(backupFile)) {
-      fs.delete(backupFile, true);
-    }
-    if (isActiveStatus) {
-      isActiveStatus = false;
-    }
-    stopped = true;
-  }
-
-  @Override
-  public boolean isActiveStatus() {
-    return isActiveStatus;
-  }
-
-  @Override
-  public List<TajoMasterInfo> getMasters() throws IOException {
-    List<TajoMasterInfo> list = TUtil.newList();
-    Path path = null;
-
-    FileStatus[] files = fs.listStatus(activePath);
-    if (files.length == 1) {
-      path = files[0].getPath();
-      list.add(createTajoMasterInfo(path, true));
-    }
-
-    files = fs.listStatus(backupPath);
-    for (FileStatus status : files) {
-      path = status.getPath();
-      list.add(createTajoMasterInfo(path, false));
-    }
-
-    return list;
-  }
-
-  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
-    String masterAddress = path.getName().replaceAll("_", ":");
-    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
-    FSDataInputStream stream = fs.open(path);
-    String data = stream.readUTF();
-
-    stream.close();
-
-    String[] addresses = data.split("_");
-    TajoMasterInfo info = new TajoMasterInfo();
-
-    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
-    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
-    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
-    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
-    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
-    info.setAvailable(isAlive);
-    info.setActive(isActive);
-
-    return info;
-  }
-
-  private class PingChecker implements Runnable {
-    @Override
-    public void run() {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        synchronized (HAServiceHDFSImpl.this) {
-          try {
-            if (!currentActiveMaster.equals(masterName)) {
-              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
-                  + ", isAlive:" + isAlive);
-              }
-
-              // If active master is dead, this master should be active master instead of
-              // previous active master.
-              if (!isAlive) {
-                FileStatus[] files = fs.listStatus(activePath);
-                if (files.length == 0 || (files.length ==  1
-                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
-                  delete();
-                  register();
-                }
-              }
-            }
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-        try {
-          Thread.sleep(monitorInterval);
-        } catch (InterruptedException e) {
-          LOG.info("PingChecker interrupted. - masterName:" + masterName);
-          break;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
new file mode 100644
index 0000000..1475a5d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.HAServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.TUtil;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+@SuppressWarnings("unused")
+public class HdfsServiceTracker extends HAServiceTracker {
+  private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class);
+
+  private TajoConf conf;
+
+  private FileSystem fs;
+
+  private String masterName;
+  private Path rootPath;
+  private Path haPath;
+  private Path activePath;
+  private Path backupPath;
+
+  private boolean isActiveStatus = false;
+
+  //thread which runs periodically to see the last time since a heartbeat is received.
+  private Thread checkerThread;
+  private volatile boolean stopped = false;
+
+  private int monitorInterval;
+
+  private String currentActiveMaster;
+
+  public HdfsServiceTracker(TajoConf conf) throws IOException {
+    this.conf = conf;
+    initSystemDirectory();
+
+    InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+    monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+  }
+
+  private void initSystemDirectory() throws IOException {
+    // Get Tajo root dir
+    this.rootPath = TajoConf.getTajoRootDir(conf);
+
+    // Check Tajo root dir
+    this.fs = rootPath.getFileSystem(conf);
+
+    // Check and create Tajo system HA dir
+    haPath = TajoConf.getSystemHADir(conf);
+    if (!fs.exists(haPath)) {
+      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA dir '" + haPath + "' is created");
+    }
+
+    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+    if (!fs.exists(activePath)) {
+      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Active dir '" + activePath + "' is created");
+    }
+
+    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+    if (!fs.exists(backupPath)) {
+      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Backup dir '" + backupPath + "' is created");
+    }
+  }
+
+  private void startPingChecker() {
+    if (checkerThread == null) {
+      checkerThread = new Thread(new PingChecker());
+      checkerThread.setName("Ping Checker");
+      checkerThread.start();
+    }
+  }
+
+  @Override
+  public void register() throws IOException {
+    FileStatus[] files = fs.listStatus(activePath);
+
+    // Phase 1: If there is not another active master, this try to become active master.
+    if (files.length == 0) {
+      createMasterFile(true);
+      currentActiveMaster = masterName;
+      LOG.info(String.format("This is added to active master (%s)", masterName));
+    } else {
+      // Phase 2: If there is active master information, we need to check its status.
+      Path activePath = files[0].getPath();
+      currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+      // Phase 3: If current active master is dead, this master should be active master.
+      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+        fs.delete(activePath, true);
+        createMasterFile(true);
+        currentActiveMaster = masterName;
+        LOG.info(String.format("This is added to active master (%s)", masterName));
+      } else {
+        // Phase 4: If current active master is alive, this master need to be backup master.
+        createMasterFile(false);
+        LOG.info(String.format("This is added to backup masters (%s)", masterName));
+      }
+    }
+  }
+
+  /**
+   * It will creates the following form string. It includes
+   *
+   * <pre>
+   * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
+   * </pre>
+   *
+   * @param isActive A boolean flag to indicate if it is for master or not.
+   * @throws IOException
+   */
+  private void createMasterFile(boolean isActive) throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+    Path path = null;
+
+    if (isActive) {
+      path = new Path(activePath, fileName);
+    } else {
+      path = new Path(backupPath, fileName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+    FSDataOutputStream out = fs.create(path);
+
+    try {
+      out.writeUTF(sb.toString());
+      out.hsync();
+      out.close();
+    } catch (FileAlreadyExistsException e) {
+      createMasterFile(false);
+    }
+
+    if (isActive) {
+      isActiveStatus = true;
+    } else {
+      isActiveStatus = false;
+    }
+
+    startPingChecker();
+  }
+
+
+  private InetSocketAddress getHostAddress(int type) {
+    InetSocketAddress address = null;
+
+    switch (type) {
+      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+        break;
+      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+        break;
+      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+        break;
+      case HAConstants.CATALOG_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
+        break;
+      case HAConstants.MASTER_INFO_ADDRESS:
+        address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+      default:
+        break;
+    }
+
+    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+  }
+
+  @Override
+  public void delete() throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+
+    Path activeFile = new Path(activePath, fileName);
+    if (fs.exists(activeFile)) {
+      fs.delete(activeFile, true);
+    }
+
+    Path backupFile = new Path(backupPath, fileName);
+    if (fs.exists(backupFile)) {
+      fs.delete(backupFile, true);
+    }
+    if (isActiveStatus) {
+      isActiveStatus = false;
+    }
+    stopped = true;
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return isActiveStatus;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    List<TajoMasterInfo> list = TUtil.newList();
+    Path path = null;
+
+    FileStatus[] files = fs.listStatus(activePath);
+    if (files.length == 1) {
+      path = files[0].getPath();
+      list.add(createTajoMasterInfo(path, true));
+    }
+
+    files = fs.listStatus(backupPath);
+    for (FileStatus status : files) {
+      path = status.getPath();
+      list.add(createTajoMasterInfo(path, false));
+    }
+
+    return list;
+  }
+
+  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+    String masterAddress = path.getName().replaceAll("_", ":");
+    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+    FSDataInputStream stream = fs.open(path);
+    String data = stream.readUTF();
+
+    stream.close();
+
+    String[] addresses = data.split("_");
+    TajoMasterInfo info = new TajoMasterInfo();
+
+    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+    info.setAvailable(isAlive);
+    info.setActive(isActive);
+
+    return info;
+  }
+
+  private class PingChecker implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        synchronized (HdfsServiceTracker.this) {
+          try {
+            if (!currentActiveMaster.equals(masterName)) {
+              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+                  + ", isAlive:" + isAlive);
+              }
+
+              // If active master is dead, this master should be active master instead of
+              // previous active master.
+              if (!isAlive) {
+                FileStatus[] files = fs.listStatus(activePath);
+                if (files.length == 0 || (files.length ==  1
+                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+                  delete();
+                  register();
+                }
+              }
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+        try {
+          Thread.sleep(monitorInterval);
+        } catch (InterruptedException e) {
+          LOG.info("PingChecker interrupted. - masterName:" + masterName);
+          break;
+        }
+      }
+    }
+  }
+
+  private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0;
+  private final static int MASTER_CLIENT_RPC_ADDRESS = 1;
+  private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2;
+  private final static int CATALOG_ADDRESS = 3;
+  private final static int MASTER_HTTP_INFO = 4;
+
+  private volatile InetSocketAddress umbilicalRpcAddr;
+  private volatile InetSocketAddress clientRpcAddr;
+  private volatile InetSocketAddress resourceTrackerRpcAddr;
+  private volatile InetSocketAddress catalogAddr;
+  private volatile InetSocketAddress masterHttpInfoAddr;
+
+  @Override
+  public InetSocketAddress getUmbilicalAddress() {
+    if (!checkConnection(umbilicalRpcAddr)) {
+      umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS));
+    }
+
+    return umbilicalRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getClientServiceAddress() {
+    if (!checkConnection(clientRpcAddr)) {
+      clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS));
+    }
+
+    return clientRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getResourceTrackerAddress() {
+    if (!checkConnection(resourceTrackerRpcAddr)) {
+      resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS));
+    }
+
+    return resourceTrackerRpcAddr;
+  }
+
+  @Override
+  public InetSocketAddress getCatalogAddress() {
+    if (!checkConnection(catalogAddr)) {
+      catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS));
+    }
+
+    return catalogAddr;
+  }
+
+  @Override
+  public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+    if (!checkConnection(masterHttpInfoAddr)) {
+      masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO));
+    }
+
+    return masterHttpInfoAddr;
+  }
+
+  /**
+   * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file.   *
+   *
+   * @param conf
+   * @return all service addresses
+   * @throws ServiceTrackerException
+   */
+  private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException {
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+      if (!fs.exists(activeMasterBaseDir)) {
+        throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir);
+      }
+      if (!fs.isDirectory(activeMasterBaseDir)) {
+        throw new ServiceTrackerException("Active master base path must be a directory.");
+      }
+
+      FileStatus[] files = fs.listStatus(activeMasterBaseDir);
+
+      if (files.length < 1) {
+        throw new ServiceTrackerException("No active master entry");
+      } else if (files.length > 1) {
+        throw new ServiceTrackerException("Two or more than active master entries.");
+      }
+
+      // We can ensure that there is only one file due to the above assertion.
+      Path activeMasterEntry = files[0].getPath();
+
+      if (!fs.isFile(activeMasterEntry)) {
+        throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
+      }
+
+      List<String> addressElements = TUtil.newList();
+
+      addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements
+
+      FSDataInputStream stream = fs.open(activeMasterEntry);
+      String data = stream.readUTF();
+      stream.close();
+
+      addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
+
+      // ensure the number of entries
+      Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary.");
+
+      return addressElements;
+
+    } catch (Throwable t) {
+      throw new ServiceTrackerException(t);
+    }
+  }
+
+
+  public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+    return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+  }
+
+  public static boolean isMasterAlive(String masterName, TajoConf conf) {
+    boolean isAlive = true;
+
+    try {
+      // how to create sockets
+      SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+      int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+      InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+      // connected socket
+      Socket socket = socketFactory.createSocket();
+      org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+    } catch (Exception e) {
+      isAlive = false;
+    }
+    return isAlive;
+  }
+
+  public static int getState(String masterName, TajoConf conf) {
+    String targetMaster = masterName.replaceAll(":", "_");
+    int retValue = -1;
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 0;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (temPath.getName().equals(targetMaster)) {
+          return 1;
+        }
+      }
+      retValue = -2;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+  public static int formatHA(TajoConf conf) {
+    int retValue = -1;
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      int aliveMasterCount = 0;
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+          aliveMasterCount++;
+        }
+      }
+
+      // If there is any alive master, users can't format storage.
+      if (aliveMasterCount > 0) {
+        return 0;
+      }
+
+      // delete ha path.
+      fs.delete(TajoConf.getSystemHADir(conf), true);
+      retValue = 1;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return retValue;
+  }
+
+
+  public static List<String> getMasters(TajoConf conf) {
+    List<String> list = new ArrayList<String>();
+
+    try {
+      FileSystem fs = getFileSystem(conf);
+      Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+      Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+      Path temPath = null;
+
+      // Check backup masters
+      FileStatus[] files = fs.listStatus(backupPath);
+      for (FileStatus status : files) {
+        temPath = status.getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+      // Check active master
+      files = fs.listStatus(activePath);
+      if (files.length == 1) {
+        temPath = files[0].getPath();
+        list.add(temPath.getName().replaceAll("_", ":"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return list;
+  }
+
+  private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+    Path rootPath = TajoConf.getTajoRootDir(conf);
+    return rootPath.getFileSystem(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
deleted file mode 100644
index c6fdd40..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
-  private boolean available;
-  private boolean isActive;
-
-  private InetSocketAddress tajoMasterAddress;
-  private InetSocketAddress tajoClientAddress;
-  private InetSocketAddress workerResourceTrackerAddr;
-  private InetSocketAddress catalogAddress;
-  private InetSocketAddress webServerAddress;
-
-  public InetSocketAddress getTajoMasterAddress() {
-    return tajoMasterAddress;
-  }
-
-  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
-    this.tajoMasterAddress = tajoMasterAddress;
-  }
-
-  public InetSocketAddress getTajoClientAddress() {
-    return tajoClientAddress;
-  }
-
-  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
-    this.tajoClientAddress = tajoClientAddress;
-  }
-
-  public InetSocketAddress getWorkerResourceTrackerAddr() {
-    return workerResourceTrackerAddr;
-  }
-
-  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
-    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
-  }
-
-  public InetSocketAddress getCatalogAddress() {
-    return catalogAddress;
-  }
-
-  public void setCatalogAddress(InetSocketAddress catalogAddress) {
-    this.catalogAddress = catalogAddress;
-  }
-
-  public InetSocketAddress getWebServerAddress() {
-    return webServerAddress;
-  }
-
-  public void setWebServerAddress(InetSocketAddress webServerAddress) {
-    this.webServerAddress = webServerAddress;
-  }
-
-  public boolean isAvailable() {
-    return available;
-  }
-
-  public void setAvailable(boolean available) {
-    this.available = available;
-  }
-
-  public boolean isActive() {
-    return isActive;
-  }
-
-  public void setActive(boolean active) {
-    isActive = active;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 42ffd87..996d356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -38,6 +38,8 @@ import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.worker.TajoWorker;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -45,6 +47,7 @@ import java.util.List;
 
 public class TajoContainerProxy extends ContainerProxy {
   private final QueryContext queryContext;
+  private final TajoWorker.WorkerContext workerContext;
   private final String planJson;
 
   public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
@@ -52,6 +55,7 @@ public class TajoContainerProxy extends ContainerProxy {
                             QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
     super(context, conf, executionBlockId, container);
     this.queryContext = queryContext;
+    this.workerContext = context.getQueryMasterContext().getWorkerContext();
     this.planJson = planJson;
   }
 
@@ -171,27 +175,8 @@ public class TajoContainerProxy extends ContainerProxy {
     RpcConnectionPool connPool = RpcConnectionPool.getPool();
     NettyClientBase tmClient = null;
     try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      TajoConf conf = context.getConf();
-      if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        } catch (Exception e) {
-          context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(conf));
-          context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(conf));
-          tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              QueryCoordinatorProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            QueryCoordinatorProtocol.class, true);
-      }
+      ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
+      tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
 
       QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,

http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 786025a..a11606f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -40,18 +40,18 @@ import org.apache.tajo.catalog.LocalCatalogWrapper;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.ha.HAService;
-import org.apache.tajo.ha.HAServiceHDFSImpl;
-import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
-import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.SessionManager;
+import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.session.SessionManager;
 import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.*;
@@ -127,7 +127,7 @@ public class TajoMaster extends CompositeService {
 
   private TajoSystemMetrics systemMetrics;
 
-  private HAService haService;
+  private ServiceTracker haService;
 
   private JvmPauseMonitor pauseMonitor;
 
@@ -226,15 +226,6 @@ public class TajoMaster extends CompositeService {
     }
   }
 
-
-  private void initHAManger() throws Exception {
-    // If tajo provides haService based on ZooKeeper, following codes need to update.
-    if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      haService = new HAServiceHDFSImpl(context);
-      haService.register();
-    }
-  }
-
   public boolean isActiveMaster() {
     return (haService != null ? haService.isActiveStatus() : true);
   }
@@ -326,11 +317,8 @@ public class TajoMaster extends CompositeService {
 
     initSystemMetrics();
 
-    try {
-      initHAManger();
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
+    haService = ServiceTrackerFactory.get(systemConf);
+    haService.register();
 
     historyWriter = new HistoryWriter(getMasterName(), true);
     historyWriter.init(getConfig());
@@ -477,7 +465,7 @@ public class TajoMaster extends CompositeService {
       return systemMetrics;
     }
 
-    public HAService getHAService() {
+    public ServiceTracker getHAService() {
       return haService;
     }
 


[02/10] tajo git commit: TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (missing file)

Posted by ji...@apache.org.
TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (missing file)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/015913b7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/015913b7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/015913b7

Branch: refs/heads/index_support
Commit: 015913b7a7eec922475a67d9455457f229367af7
Parents: b9719ba
Author: JaeHwa Jung <bl...@apache.org>
Authored: Wed Jan 28 14:31:34 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Wed Jan 28 14:31:34 2015 +0900

----------------------------------------------------------------------
 tajo-docs/src/main/sphinx/hbase_integration.rst | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/015913b7/tajo-docs/src/main/sphinx/hbase_integration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/hbase_integration.rst b/tajo-docs/src/main/sphinx/hbase_integration.rst
index 73ef6d1..779223b 100644
--- a/tajo-docs/src/main/sphinx/hbase_integration.rst
+++ b/tajo-docs/src/main/sphinx/hbase_integration.rst
@@ -29,14 +29,16 @@ CREATE TABLE
   USING hbase
   WITH ('table'='<hbase_table_name>'
   , 'columns'=':key,<column_family_name>:<qualifier_name>, ...'
-  , 'hbase.zookeeper.quorum'='<zookeeper_address>')
+  , 'hbase.zookeeper.quorum'='<zookeeper_address>'
+  , 'hbase.zookeeper.property.clientPort'='<zookeeper_client_port>'
+  )
 
 Options
 
 * ``table`` : Set hbase origin table name. If you want to create an external table, the table must exists on HBase. The other way, if you want to create a managed table, the table must doesn't exist on HBase.
 * ``columns`` : :key means HBase row key. The number of columns entry need to equals to the number of Tajo table column
 * ``hbase.zookeeper.quorum`` : Set zookeeper quorum address. You can use different zookeeper cluster on the same Tajo database. If you don't set the zookeeper address, Tajo will refer the property of hbase-site.xml file.
-
+* ``hbase.zookeeper.property.clientPort`` : Set zookeeper client port. If you don't set the port, Tajo will refer the property of hbase-site.xml file.
 
 ``IF NOT EXISTS`` allows ``CREATE [EXTERNAL] TABLE`` statement to avoid an error which occurs when the table does not exist.
 


[09/10] tajo git commit: TAJO-1321: Cli prints wrong response time. (jihoon)

Posted by ji...@apache.org.
TAJO-1321: Cli prints wrong response time. (jihoon)

Closes #369


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/58bbb1bb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/58bbb1bb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/58bbb1bb

Branch: refs/heads/index_support
Commit: 58bbb1bb424288fdb3e731374de9938911d8e0f3
Parents: 1e00759
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Feb 2 21:26:40 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Feb 2 21:27:01 2015 +0900

----------------------------------------------------------------------
 CHANGES                                                      | 2 ++
 .../main/java/org/apache/tajo/master/QueryInProgress.java    | 2 +-
 .../src/main/java/org/apache/tajo/master/QueryManager.java   | 4 ----
 .../java/org/apache/tajo/master/TajoMasterClientService.java | 8 ++------
 .../main/java/org/apache/tajo/querymaster/QueryMaster.java   | 1 -
 tajo-core/src/main/proto/QueryCoordinatorProtocol.proto      | 1 -
 6 files changed, 5 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 666ee9a..3667991 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1321: Cli prints wrong response time. (jihoon)
+
     TAJO-1313: Tajo-dump creates DDLs for information_schema tables.
     (jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index e7371dd..45bdc5a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -208,7 +208,6 @@ public class QueryInProgress {
 
       this.queryInfo.setQueryState(queryInfo.getQueryState());
       this.queryInfo.setProgress(queryInfo.getProgress());
-      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
 
       // Update diagnosis message
       if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
@@ -223,6 +222,7 @@ public class QueryInProgress {
 
 
       if (isFinishState(this.queryInfo.getQueryState())) {
+        this.queryInfo.setFinishTime(System.currentTimeMillis());
         masterContext.getQueryJobManager().getEventHandler().handle(
             new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index bc6f07b..db895ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -320,10 +320,6 @@ public class QueryManager extends CompositeService {
     queryInfo.setQueryState(queryHeartbeat.getState());
     queryInfo.setProgress(queryHeartbeat.getQueryProgress());
 
-    if (queryHeartbeat.hasQueryFinishTime()) {
-      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
-    }
-
     if (queryHeartbeat.hasResultDesc()) {
       queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 16e4fea..6af3248 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -393,9 +393,7 @@ public class TajoMasterClientService extends AbstractService {
           infoBuilder.setState(queryInfo.getQueryState());
           infoBuilder.setQuery(queryInfo.getSql());
           infoBuilder.setStartTime(queryInfo.getStartTime());
-          long endTime = (queryInfo.getFinishTime() == 0) ?
-                         System.currentTimeMillis() : queryInfo.getFinishTime();
-          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setFinishTime(System.currentTimeMillis());
           infoBuilder.setProgress(queryInfo.getProgress());
           infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
           infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
@@ -428,9 +426,7 @@ public class TajoMasterClientService extends AbstractService {
           infoBuilder.setState(queryInfo.getQueryState());
           infoBuilder.setQuery(queryInfo.getSql());
           infoBuilder.setStartTime(queryInfo.getStartTime());
-          long endTime = (queryInfo.getFinishTime() == 0) ?
-              System.currentTimeMillis() : queryInfo.getFinishTime();
-          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setFinishTime(queryInfo.getFinishTime());
           infoBuilder.setProgress(queryInfo.getProgress());
           infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
           infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());

http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index a30df54..234a46a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -391,7 +391,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
         builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
       }
       builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
-      builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
index 41a382f..cef385e 100644
--- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -69,7 +69,6 @@ message TajoHeartbeat {
   optional TableDescProto resultDesc = 4;
   optional string statusMessage = 5;
   optional float queryProgress = 6;
-  optional int64 queryFinishTime = 7;
 }
 
 message TajoHeartbeatResponse {