You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/02/02 13:29:06 UTC
[01/10] tajo git commit: TAJO-1320: HBaseStorageManager need to
support Zookeeper Client Port. (jaehwa)
Repository: tajo
Updated Branches:
refs/heads/index_support 7cafc33b0 -> 31698f537
TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (jaehwa)
Closes #363
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b9719ba7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b9719ba7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b9719ba7
Branch: refs/heads/index_support
Commit: b9719ba78ef441772ee7f5ffa44844627df95891
Parents: c429c97
Author: JaeHwa Jung <bl...@apache.org>
Authored: Wed Jan 28 13:33:01 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Wed Jan 28 13:33:01 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../apache/tajo/storage/hbase/HBaseStorageConstants.java | 1 +
.../apache/tajo/storage/hbase/HBaseStorageManager.java | 11 +++++++++++
3 files changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 453e163..3b59347 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.10.0 - unreleased
IMPROVEMENT
+ TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port.
+ (jaehwa)
+
TAJO-1309: Add missing break point in physical operator. (jinho)
TAJO-1307: HBaseStorageManager need to support for users to use
http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
index 2c525a1..99140e6 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
@@ -27,6 +27,7 @@ public interface HBaseStorageConstants {
public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys";
public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file";
public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
+ public static final String META_ZK_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter";
public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode";
http://git-wip-us.apache.org/repos/asf/tajo/blob/b9719ba7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index 59d1b48..2a635d8 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -312,6 +312,17 @@ public class HBaseStorageManager extends StorageManager {
HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
}
+ String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) {
+ zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, "");
+ hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort);
+ }
+
+ if (zkPort == null || zkPort.trim().isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute.");
+ }
+
for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
String key = eachOption.getKey();
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
[06/10] tajo git commit: TAJO-1322: Invalid stored caching on
StorageManager. (jinho)
Posted by ji...@apache.org.
TAJO-1322: Invalid stored caching on StorageManager. (jinho)
Closes #367
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e656ee28
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e656ee28
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e656ee28
Branch: refs/heads/index_support
Commit: e656ee2872e660d95c25e7b080064f44a8d9d01e
Parents: 4595375
Author: jhkim <jh...@apache.org>
Authored: Mon Feb 2 14:42:24 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Feb 2 14:42:24 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/storage/StorageManager.java | 40 ++++++-------
.../tajo/storage/TestFileStorageManager.java | 60 +++++++++++++++-----
3 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3b59347..4a3715b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.10.0 - unreleased
BUG FIXES
+ TAJO-1322: Invalid stored caching on StorageManager. (jinho)
+
TAJO-1319: Tajo can't find HBase configuration file. (jaehwa)
TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT
http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 34caa80..d929591 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -42,7 +42,6 @@ import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.net.URI;
import java.text.NumberFormat;
import java.util.List;
import java.util.Map;
@@ -284,14 +283,11 @@ public abstract class StorageManager {
* @throws java.io.IOException
*/
public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException {
- URI uri;
TajoConf copiedConf = new TajoConf(tajoConf);
if (warehousePath != null) {
copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString());
}
- uri = TajoConf.getWarehouseDir(copiedConf).toUri();
- String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
- return getStorageManager(copiedConf, StoreType.CSV, key);
+ return getStorageManager(copiedConf, StoreType.CSV);
}
/**
@@ -303,7 +299,7 @@ public abstract class StorageManager {
* @throws java.io.IOException
*/
public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
- if ("HBASE".equals(storeType)) {
+ if ("HBASE".equalsIgnoreCase(storeType)) {
return getStorageManager(tajoConf, StoreType.HBASE);
} else {
return getStorageManager(tajoConf, StoreType.CSV);
@@ -319,7 +315,12 @@ public abstract class StorageManager {
* @throws java.io.IOException
*/
public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException {
- return getStorageManager(tajoConf, storeType, null);
+ FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+ if (fileSystem != null) {
+ return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
+ } else {
+ return getStorageManager(tajoConf, storeType, null);
+ }
}
/**
@@ -331,22 +332,23 @@ public abstract class StorageManager {
* @return
* @throws java.io.IOException
*/
- public static synchronized StorageManager getStorageManager (
+ private static synchronized StorageManager getStorageManager (
TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException {
+
+ String typeName;
+ switch (storeType) {
+ case HBASE:
+ typeName = "hbase";
+ break;
+ default:
+ typeName = "hdfs";
+ }
+
synchronized (storageManagers) {
- String storeKey = CatalogUtil.getStoreTypeString(storeType) + managerKey;
+ String storeKey = typeName + "_" + managerKey;
StorageManager manager = storageManagers.get(storeKey);
- if (manager == null) {
- String typeName = "hdfs";
-
- switch (storeType) {
- case HBASE:
- typeName = "hbase";
- break;
- default:
- typeName = "hdfs";
- }
+ if (manager == null) {
Class<? extends StorageManager> storageManagerClass =
tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 19a39a2..c4df8d7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -38,7 +38,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
@@ -48,7 +47,6 @@ import static org.junit.Assert.*;
public class TestFileStorageManager {
private TajoConf conf;
private static String TEST_PATH = "target/test-data/TestFileStorageManager";
- StorageManager sm = null;
private Path testDir;
private FileSystem fs;
@@ -57,7 +55,6 @@ public class TestFileStorageManager {
conf = new TajoConf();
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = StorageManager.getFileStorageManager(conf, testDir);
}
@After
@@ -84,14 +81,17 @@ public class TestFileStorageManager {
Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
fs.mkdirs(path.getParent());
- Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path);
+ FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
+
+ Appender appender = fileStorageManager.getAppender(meta, schema, path);
appender.init();
for(Tuple t : tuples) {
appender.addTuple(t);
}
appender.close();
- Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path);
+ Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
scanner.init();
int i=0;
while(scanner.next() != null) {
@@ -110,6 +110,9 @@ public class TestFileStorageManager {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
+ cluster.waitClusterUp();
+ TajoConf tajoConf = new TajoConf(conf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
int testCount = 10;
Path tablePath = new Path("/testGetSplit");
@@ -125,7 +128,8 @@ public class TestFileStorageManager {
}
assertTrue(fs.exists(tablePath));
- FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+ assertEquals(fs.getUri(), sm.getFileSystem().getUri());
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
@@ -148,10 +152,7 @@ public class TestFileStorageManager {
assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
fs.close();
} finally {
- cluster.shutdown();
-
- File dir = new File(testDataPath);
- dir.delete();
+ cluster.shutdown(true);
}
}
@@ -165,6 +166,10 @@ public class TestFileStorageManager {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
+ cluster.waitClusterUp();
+
+ TajoConf tajoConf = new TajoConf(conf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
int testCount = 10;
Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
@@ -177,7 +182,8 @@ public class TestFileStorageManager {
DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
}
assertTrue(fs.exists(tablePath));
- FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+ assertEquals(fs.getUri(), sm.getFileSystem().getUri());
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
@@ -194,10 +200,36 @@ public class TestFileStorageManager {
assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
fs.close();
} finally {
- cluster.shutdown();
+ cluster.shutdown(true);
+ }
+ }
- File dir = new File(testDataPath);
- dir.delete();
+ @Test
+ public void testStoreType() throws Exception {
+ final Configuration hdfsConf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(2).build();
+ cluster.waitClusterUp();
+
+ TajoConf tajoConf = new TajoConf(hdfsConf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+ try {
+ /* Local FileSystem */
+ FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, StoreType.CSV);
+ assertEquals(fs.getUri(), sm.getFileSystem().getUri());
+
+ /* Distributed FileSystem */
+ sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, StoreType.CSV);
+ assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
+ assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
+ } finally {
+ cluster.shutdown(true);
}
}
}
[08/10] tajo git commit: TAJO-1313: Tajo-dump creates DDLs for
information_schema tables
Posted by ji...@apache.org.
TAJO-1313: Tajo-dump creates DDLs for information_schema tables
Closes #365
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1e007595
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1e007595
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1e007595
Branch: refs/heads/index_support
Commit: 1e007595dc4ebbcab42d8e32e9d519e7cbf6f96b
Parents: fd73074
Author: Jihun Kang <ji...@apache.org>
Authored: Mon Feb 2 17:22:15 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Mon Feb 2 17:22:15 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../org/apache/tajo/catalog/CatalogConstants.java | 2 ++
.../java/org/apache/tajo/cli/tools/TajoDump.java | 16 +++++++++++++++-
3 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8466a38..666ee9a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,9 @@ Release 0.10.0 - unreleased
BUG FIXES
+ TAJO-1313: Tajo-dump creates DDLs for information_schema tables.
+ (jihun)
+
TAJO-1322: Invalid stored caching on StorageManager. (jinho)
TAJO-1319: Tajo can't find HBase configuration file. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 6ec52b9..a8c5c9b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -53,4 +53,6 @@ public class CatalogConstants {
public static final String COL_DATABASES_PK = "DB_ID";
public static final String COL_TABLES_PK = "TID";
public static final String COL_TABLES_NAME = "TABLE_NAME";
+
+ public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema";
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1e007595/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
index 750ead0..7f38a5d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoDump.java
@@ -19,11 +19,14 @@
package org.apache.tajo.cli.tools;
import com.google.protobuf.ServiceException;
+
import org.apache.commons.cli.*;
import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.DDLBuilder;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
@@ -116,6 +119,10 @@ public class TajoDump {
System.exit(0);
}
+
+ private static boolean isAcceptableDumpingDatabase(String databaseName) {
+ return (databaseName == null || !databaseName.equalsIgnoreCase(CatalogConstants.INFORMATION_SCHEMA_DB_NAME));
+ }
public static void dump(TajoClient client, UserRoleInfo userInfo, String baseDatabaseName,
boolean isDumpingAllDatabases, boolean includeUserName, boolean includeDate, PrintWriter out)
@@ -128,7 +135,9 @@ public class TajoDump {
Collections.sort(sorted);
for (String databaseName : sorted) {
- dumpDatabase(client, databaseName, out);
+ if (isAcceptableDumpingDatabase(databaseName)) {
+ dumpDatabase(client, databaseName, out);
+ }
}
} else {
dumpDatabase(client, baseDatabaseName, out);
@@ -166,6 +175,11 @@ public class TajoDump {
for (String tableName : tableNames) {
try {
TableDesc table = client.getTableDesc(CatalogUtil.buildFQName(databaseName, tableName));
+
+ if (table.getMeta().getStoreType() == StoreType.SYSTEM) {
+ continue;
+ }
+
if (table.isExternal()) {
writer.write(DDLBuilder.buildDDLForExternalTable(table));
} else {
[05/10] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/255046be
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/255046be
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/255046be
Branch: refs/heads/index_support
Commit: 255046be8fb2eeeb6155322bff9a2bb20ad3e6ee
Parents: 7cafc33 4595375
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Jan 30 23:32:33 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Jan 30 23:32:33 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../tajo/catalog/AbstractCatalogClient.java | 12 +-
.../apache/tajo/client/DummyServiceTracker.java | 84 +++
.../apache/tajo/client/SessionConnection.java | 44 +-
.../org/apache/tajo/client/TajoClientImpl.java | 38 +-
.../apache/tajo/client/TajoHAClientUtil.java | 14 +-
.../java/org/apache/tajo/conf/TajoConf.java | 7 +-
.../java/org/apache/tajo/ha/HAServiceUtil.java | 39 --
.../apache/tajo/service/BaseServiceTracker.java | 97 ++++
.../apache/tajo/service/HAServiceTracker.java | 48 ++
.../org/apache/tajo/service/ServiceTracker.java | 63 ++
.../tajo/service/ServiceTrackerException.java | 30 +
.../tajo/service/ServiceTrackerFactory.java | 41 ++
.../org/apache/tajo/service/TajoMasterInfo.java | 89 +++
.../org/apache/tajo/benchmark/BenchmarkSet.java | 15 +-
.../main/java/org/apache/tajo/ha/HAService.java | 56 --
.../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 ----------
.../org/apache/tajo/ha/HdfsServiceTracker.java | 576 +++++++++++++++++++
.../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ---
.../apache/tajo/master/TajoContainerProxy.java | 27 +-
.../java/org/apache/tajo/master/TajoMaster.java | 30 +-
.../apache/tajo/querymaster/QueryMaster.java | 66 +--
.../main/java/org/apache/tajo/util/JSPUtil.java | 12 +-
.../tajo/worker/TajoResourceAllocator.java | 28 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 46 +-
.../tajo/worker/TajoWorkerClientService.java | 2 +-
.../tajo/worker/WorkerHeartbeatService.java | 20 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 11 +-
.../resources/webapps/admin/catalogview.jsp | 5 +-
.../main/resources/webapps/admin/cluster.jsp | 7 +-
.../src/main/resources/webapps/admin/index.jsp | 7 +-
.../resources/webapps/admin/query_executor.jsp | 5 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 10 +-
tajo-docs/src/main/sphinx/hbase_integration.rst | 6 +-
.../org/apache/tajo/jdbc/JdbcConnection.java | 10 +-
.../org/apache/tajo/jdbc/TajoStatement.java | 4 +-
.../org/apache/tajo/storage/StorageUtil.java | 16 -
.../storage/hbase/HBaseStorageConstants.java | 1 +
.../tajo/storage/hbase/HBaseStorageManager.java | 11 +
39 files changed, 1175 insertions(+), 810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 53e5ab4,718f7d6..2483c1f
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -35,8 -34,9 +34,10 @@@ import org.apache.tajo.rpc.RpcConnectio
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+ import org.apache.tajo.service.ServiceTracker;
+ import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 9522746,f8eef28..6f6059e
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@@ -32,10 -31,7 +32,8 @@@ import org.apache.tajo.catalog.TableDes
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.conf.TajoConf.ConfVars;
- import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.jdbc.TajoResultSet;
http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/255046be/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
[03/10] tajo git commit: TAJO-1306: HAServiceUtil should not directly
use HDFS.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 1496b62..a30df54 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -44,6 +44,7 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
@@ -229,23 +230,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
// worker may fail to connect existing active master. Thus,
// if worker can't connect the master, worker should try to connect another master and
// update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
+ ServiceTracker serviceTracker = workerContext.getServiceTracker();
+ rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterService = rpc.getStub();
CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
@@ -353,24 +340,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
NettyClientBase tmClient = null;
try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
+ tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(),
+ QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
@@ -474,32 +445,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
for(QueryMasterTask eachTask: tempTasks) {
NettyClientBase tmClient;
try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
+ ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
+ tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
+ QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
- CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
- new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
-
+ CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 82fb37f..13f4dcc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -23,16 +23,16 @@ import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.ha.HAService;
import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Stage;
-import org.apache.tajo.util.history.TaskHistory;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.history.StageHistory;
-import org.apache.tajo.worker.TaskRunnerHistory;
+import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.worker.TaskRunner;
+import org.apache.tajo.worker.TaskRunnerHistory;
import java.text.DecimalFormat;
import java.util.*;
@@ -191,7 +191,7 @@ public class JSPUtil {
}
public static String getMasterActiveLabel(MasterContext context) {
- HAService haService = context.getHAService();
+ ServiceTracker haService = context.getHAService();
String activeLabel = "";
if (haService != null) {
if (haService.isActiveStatus()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index dd408c9..c6a06f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -51,6 +51,7 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.ApplicationIdUtils;
import java.net.InetSocketAddress;
@@ -274,31 +275,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
-
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- queryTaskContext.getQueryMasterContext().getWorkerContext().
- setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
- queryTaskContext.getQueryMasterContext().getWorkerContext().
- setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
- tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
-
+ ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
+ tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index c217e3e..7f73916 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -34,8 +34,9 @@ import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ha.TajoMasterInfo;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.service.TajoMasterInfo;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -63,7 +64,6 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -112,6 +112,8 @@ public class TajoWorker extends CompositeService {
@Deprecated
private boolean taskRunnerMode;
+ private ServiceTracker serviceTracker;
+
private WorkerHeartbeatService workerHeartbeatThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -189,6 +191,8 @@ public class TajoWorker extends CompositeService {
this.systemConf = (TajoConf)conf;
RackResolver.init(systemConf);
+ serviceTracker = ServiceTrackerFactory.get(systemConf);
+
this.workerContext = new WorkerContext();
this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -352,8 +356,8 @@ public class TajoWorker extends CompositeService {
tajoMasterInfo = new TajoMasterInfo();
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+ tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
+ tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress());
} else {
tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
@@ -425,18 +429,14 @@ public class TajoWorker extends CompositeService {
return systemConf;
}
- public TajoWorkerManagerService getTajoWorkerManagerService() {
- return tajoWorkerManagerService;
+ public ServiceTracker getServiceTracker() {
+ return serviceTracker;
}
public QueryMasterManagerService getQueryMasterManagerService() {
return queryMasterManagerService;
}
- public TajoWorkerClientService getTajoWorkerClientService() {
- return tajoWorkerClientService;
- }
-
public TaskRunnerManager getTaskRunnerManager() {
return taskRunnerManager;
}
@@ -521,10 +521,6 @@ public class TajoWorker extends CompositeService {
TajoWorker.this.numClusterNodes.set(numClusterNodes);
}
- public int getNumClusterNodes() {
- return TajoWorker.this.numClusterNodes.get();
- }
-
public void setClusterResource(ClusterResourceSummary clusterResource) {
synchronized (numClusterNodes) {
TajoWorker.this.clusterResource = clusterResource;
@@ -537,26 +533,6 @@ public class TajoWorker extends CompositeService {
}
}
- public InetSocketAddress getTajoMasterAddress() {
- return tajoMasterInfo.getTajoMasterAddress();
- }
-
- public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
- tajoMasterInfo.setTajoMasterAddress(tajoMasterAddress);
- }
-
- public InetSocketAddress getResourceTrackerAddress() {
- return tajoMasterInfo.getWorkerResourceTrackerAddr();
- }
-
- public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
- tajoMasterInfo.setWorkerResourceTrackerAddr(workerResourceTrackerAddr);
- }
-
- public int getPeerRpcPort() {
- return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
- }
-
public boolean isQueryMasterMode() {
return queryMasterMode;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 0b815d8..c0a6453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -119,7 +119,7 @@ public class TajoWorkerClientService extends AbstractService {
QueryId queryId = new QueryId(request.getQueryId());
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
- QueryHistory queryHistory;
+ QueryHistory queryHistory = null;
if (queryMasterTask == null) {
queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString());
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 676c72b..870e9a0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -35,6 +35,7 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.storage.DiskDeviceInfo;
import org.apache.tajo.storage.DiskMountInfo;
import org.apache.tajo.storage.DiskUtil;
@@ -183,22 +184,9 @@ public class WorkerHeartbeatService extends AbstractService {
try {
CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
- } catch (Exception e) {
- context.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
- context.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
- }
- } else {
- rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
- }
-
+ ServiceTracker serviceTracker = context.getServiceTracker();
+ rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(),
+ TajoResourceTrackerProtocol.class, true);
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 9d9f39c..4b76c73 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -26,6 +26,8 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
@@ -45,13 +47,8 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
InetSocketAddress masterAddress = null;
try {
- if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- masterAddress = HAServiceUtil.getMasterUmbilicalAddress(tajoConf);
- } else {
- masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
- }
- masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
-
+ ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf);
+ masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
masterClient.getStub();
} finally {
if (masterClient != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index bc770d7..1ff81a6 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -24,12 +24,13 @@
<%@ page import="org.apache.tajo.catalog.TableDesc" %>
<%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%@ page import="org.apache.tajo.util.FileUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.Collection" %>
<%@ page import="java.util.List" %>
<%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
CatalogService catalog = master.getCatalog();
@@ -59,7 +60,7 @@
//TODO filter with database
Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase);
- HAService haService = master.getContext().getHAService();
+ ServiceTracker haService = master.getContext().getHAService();
String activeLabel = "";
if (haService != null) {
if (haService.isActiveStatus()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 1fb5e40..aca1153 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -21,8 +21,8 @@
<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
-<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
@@ -30,6 +30,7 @@
<%@ page import="org.apache.tajo.util.TUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.*" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -75,7 +76,7 @@
String deadWorkersHtml = deadWorkers.isEmpty() ? "0": "<font color='red'>" + deadWorkers.size() + "</font>";
String deadQueryMastersHtml = deadQueryMasters.isEmpty() ? "0": "<font color='red'>" + deadQueryMasters.size() + "</font>";
- HAService haService = master.getContext().getHAService();
+ ServiceTracker haService = master.getContext().getHAService();
List<TajoMasterInfo> masters = TUtil.newList();
String activeLabel = "";
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index d98abfd..0a0558e 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,8 +23,8 @@
<%@ page import="org.apache.tajo.conf.TajoConf" %>
<%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
-<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
@@ -35,6 +35,7 @@
<%@ page import="java.util.Collection" %>
<%@ page import="java.util.Date" %>
<%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -85,7 +86,7 @@
String numDeadWorkersHtml = numDeadWorkers == 0 ? "0" : "<font color='red'>" + numDeadWorkers + "</font>";
String numDeadQueryMastersHtml = numDeadQueryMasters == 0 ? "0" : "<font color='red'>" + numDeadQueryMasters + "</font>";
- HAService haService = master.getContext().getHAService();
+ ServiceTracker haService = master.getContext().getHAService();
List<TajoMasterInfo> masters = TUtil.newList();
String activeLabel = "";
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index 82836ac..a0f9a0a 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -19,13 +19,14 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="javax.xml.ws.Service" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- HAService haService = master.getContext().getHAService();
+ ServiceTracker haService = master.getContext().getHAService();
String activeLabel = "";
if (haService != null) {
if (haService.isActiveStatus()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index a6f9f74..f8642ed 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -18,8 +18,6 @@
package org.apache.tajo.ha;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
@@ -29,6 +27,8 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.junit.Test;
import static junit.framework.TestCase.assertEquals;
@@ -37,8 +37,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class TestHAServiceHDFSImpl {
- private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class);
-
private TajoTestingCluster cluster;
private TajoMaster backupMaster;
@@ -60,7 +58,8 @@ public class TestHAServiceHDFSImpl {
try {
FileSystem fs = cluster.getDefaultFileSystem();
- masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0];
+ ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf);
+ masterAddress = serviceTracker.getUmbilicalAddress().getHostName();
setConfiguration();
@@ -112,6 +111,7 @@ public class TestHAServiceHDFSImpl {
masterAddress + ":" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
masterAddress + ":" + NetUtils.getFreeSocketPort());
+
conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
//Client API service RPC Server
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index f00dc25..2264b62 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -23,12 +23,11 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.TajoConstants;
-import org.apache.tajo.client.CatalogAdminClient;
-import org.apache.tajo.client.QueryClient;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.jdbc.util.QueryStringDecoder;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.URI;
@@ -109,7 +108,8 @@ public class JdbcConnection implements Connection {
}
try {
- tajoClient = new TajoClientImpl(hostName, port, databaseName);
+ ServiceTracker serviceTracker = new DummyServiceTracker(NetUtils.createSocketAddr(hostName, port));
+ tajoClient = new TajoClientImpl(tajoConf, serviceTracker, databaseName);
} catch (Exception e) {
throw new SQLException("Cannot create TajoClient instance:" + e.getMessage(), "TAJO-002");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index eb7f8c9..7f89b46 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -118,7 +118,7 @@ public class TajoStatement implements Statement {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed) {
- throw new SQLFeatureNotSupportedException("Can't execute after statement has been closed");
+ throw new SQLException("Can't execute after statement has been closed");
}
try {
@@ -130,7 +130,7 @@ public class TajoStatement implements Statement {
return tajoClient.executeQueryAndGetResult(sql);
}
} catch (Exception e) {
- throw new SQLFeatureNotSupportedException(e.getMessage(), e);
+ throw new SQLException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index af3d623..68e96d8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -38,14 +38,6 @@ import java.util.ArrayList;
import java.util.List;
public class StorageUtil extends StorageConstants {
- public static int getRowByteSize(Schema schema) {
- int sum = 0;
- for(Column col : schema.getColumns()) {
- sum += StorageUtil.getColByteSize(col);
- }
-
- return sum;
- }
public static int getColByteSize(Column col) {
switch (col.getDataType().getType()) {
@@ -83,14 +75,6 @@ public class StorageUtil extends StorageConstants {
return 0;
}
}
-
- public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
- FileSystem fs = tableroot.getFileSystem(conf);
- FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
- FileUtil.writeProto(out, meta.getProto());
- out.flush();
- out.close();
- }
public static Path concatPath(String parent, String...childs) {
return concatPath(new Path(parent), childs);
[10/10] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/31698f53
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/31698f53
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/31698f53
Branch: refs/heads/index_support
Commit: 31698f537f8c19ee0fc16968add6e29229ddc2d0
Parents: 255046b 58bbb1b
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Feb 2 21:28:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Feb 2 21:28:56 2015 +0900
----------------------------------------------------------------------
CHANGES | 9 ++
.../apache/tajo/catalog/CatalogConstants.java | 2 +
.../org/apache/tajo/cli/tools/TajoDump.java | 16 +++-
.../cli/tsql/DefaultTajoCliOutputFormatter.java | 3 +-
.../org/apache/tajo/master/QueryInProgress.java | 2 +-
.../org/apache/tajo/master/QueryManager.java | 4 -
.../tajo/master/TajoMasterClientService.java | 8 +-
.../apache/tajo/querymaster/QueryMaster.java | 1 -
.../java/org/apache/tajo/querymaster/Stage.java | 23 ++++-
.../main/proto/QueryCoordinatorProtocol.proto | 1 -
.../org/apache/tajo/TajoTestingCluster.java | 96 ++++++++------------
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 42 +++------
.../tajo/engine/query/TestInsertQuery.java | 3 +-
.../apache/tajo/querymaster/TestKillQuery.java | 20 +++-
.../TestTajoCli/testNonForwardQueryPause.result | 3 +-
.../org/apache/tajo/storage/StorageManager.java | 40 ++++----
.../tajo/storage/TestFileStorageManager.java | 60 +++++++++---
17 files changed, 192 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index c7df801,a8c5c9b..f19f77f
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@@ -52,6 -52,7 +52,8 @@@ public class CatalogConstants
public static final String COL_TABLESPACE_PK = "SPACE_ID";
public static final String COL_DATABASES_PK = "DB_ID";
public static final String COL_TABLES_PK = "TID";
+ public static final String COL_INDEXES_PK = "INDEX_ID";
public static final String COL_TABLES_NAME = "TABLE_NAME";
+
+ public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema";
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/31698f53/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
[07/10] tajo git commit: TAJO-1323: Cleanup the unstable test case.
(jinho)
Posted by ji...@apache.org.
TAJO-1323: Cleanup the unstable test case. (jinho)
Closes #368
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fd73074f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fd73074f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fd73074f
Branch: refs/heads/index_support
Commit: fd73074f2198f92516a3eb8f5e786d31c1c071a5
Parents: e656ee2
Author: jhkim <jh...@apache.org>
Authored: Mon Feb 2 14:56:14 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Feb 2 14:56:14 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../cli/tsql/DefaultTajoCliOutputFormatter.java | 3 +-
.../java/org/apache/tajo/querymaster/Stage.java | 23 ++++-
.../org/apache/tajo/TajoTestingCluster.java | 96 ++++++++------------
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 42 +++------
.../tajo/engine/query/TestInsertQuery.java | 3 +-
.../apache/tajo/querymaster/TestKillQuery.java | 20 +++-
.../TestTajoCli/testNonForwardQueryPause.result | 3 +-
8 files changed, 98 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4a3715b..8466a38 100644
--- a/CHANGES
+++ b/CHANGES
@@ -327,6 +327,8 @@ Release 0.10.0 - unreleased
TASKS
+ TAJO-1323: Cleanup the unstable test case. (jinho)
+
TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
index 17c94b9..5cbe77b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java
@@ -37,6 +37,7 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
private boolean printPause;
private boolean printErrorTrace;
private String nullChar;
+ public static char QUIT_COMMAND = 'q';
@Override
public void init(TajoCli.TajoCliContext context) {
@@ -123,7 +124,7 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
}
sout.flush();
if (sin != null) {
- if (sin.read() == 'q') {
+ if (sin.read() == QUIT_COMMAND) {
endOfTuple = false;
sout.println();
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 208d4a6..5673d5b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -185,7 +185,7 @@ public class Stage implements EventHandler<StageEvent> {
// Transitions from KILL_WAIT state
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- StageEventType.SQ_CONTAINER_ALLOCATED,
+ EnumSet.of(StageEventType.SQ_START, StageEventType.SQ_CONTAINER_ALLOCATED),
CONTAINERS_CANCEL_TRANSITION)
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
@@ -414,6 +414,18 @@ public class Stage implements EventHandler<StageEvent> {
return totalScheduledObjectsCount;
}
+ public int getKilledObjectCount() {
+ return killedObjectCount;
+ }
+
+ public int getFailedObjectCount() {
+ return failedObjectCount;
+ }
+
+ public int getCompletedTaskCount() {
+ return completedTaskCount;
+ }
+
public ExecutionBlock getBlock() {
return block;
}
@@ -793,7 +805,14 @@ public class Stage implements EventHandler<StageEvent> {
stage.taskScheduler.start();
allocateContainers(stage);
} else {
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ /* all tasks are killed before stage are inited */
+ if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) {
+ stage.eventHandler.handle(
+ new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ } else {
+ stage.eventHandler.handle(
+ new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ }
}
}
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 8714fc4..0d3d660 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -77,17 +77,13 @@ public class TajoTestingCluster {
private TajoMaster tajoMaster;
private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
private boolean standbyWorkerMode = false;
+ private boolean isDFSRunning = false;
+ private boolean isTajoClusterRunning = false;
+ private boolean isCatalogServerRunning = false;
- // If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
/**
- * System property key to get test directory value.
- * Name is as it is because mini dfs has hard-codings to put test data here.
- */
- public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
-
- /**
* Default parent directory for test output.
*/
public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
@@ -111,6 +107,7 @@ public class TajoTestingCluster {
this.conf = new TajoConf();
this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
+ initTestDir();
setTestingFlagProperties();
initPropertiesAndConfigs();
}
@@ -177,22 +174,19 @@ public class TajoTestingCluster {
return this.conf;
}
- public void initTestDir() {
- if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
- clusterTestBuildDir = setupClusterTestBuildDir();
- System.setProperty(TEST_DIRECTORY_KEY,
- clusterTestBuildDir.getAbsolutePath());
- }
- }
+ public void initTestDir() {
+ if (clusterTestBuildDir == null) {
+ clusterTestBuildDir = setupClusterTestBuildDir();
+ }
+ }
/**
* @return Where to write test data on local filesystem; usually
* {@link #DEFAULT_TEST_DIRECTORY}
* @see #setupClusterTestBuildDir()
*/
- public static File getTestDir() {
- return new File(System.getProperty(TEST_DIRECTORY_KEY,
- DEFAULT_TEST_DIRECTORY));
+ public File getTestDir() {
+ return clusterTestBuildDir;
}
/**
@@ -202,10 +196,10 @@ public class TajoTestingCluster {
* @see #setupClusterTestBuildDir()
*/
public static File getTestDir(final String subdirName) {
- return new File(getTestDir(), subdirName);
+ return new File(new File(DEFAULT_TEST_DIRECTORY), subdirName);
}
- public File setupClusterTestBuildDir() {
+ public static File setupClusterTestBuildDir() {
String randomStr = UUID.randomUUID().toString();
String dirStr = getTestDir(randomStr).toString();
File dir = new File(dirStr).getAbsoluteFile();
@@ -243,9 +237,6 @@ public class TajoTestingCluster {
File dir,
final String hosts[])
throws IOException {
- if (dir == null) {
- dir = setupClusterTestBuildDir();
- }
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.toString());
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
@@ -263,7 +254,7 @@ public class TajoTestingCluster {
this.defaultFS = this.dfsCluster.getFileSystem();
this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
-
+ isDFSRunning = true;
return this.dfsCluster;
}
@@ -300,22 +291,20 @@ public class TajoTestingCluster {
// Catalog Section
////////////////////////////////////////////////////////
public MiniCatalogServer startCatalogCluster() throws Exception {
- TajoConf c = getConfiguration();
+ if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running");
- if(clusterTestBuildDir == null) {
- clusterTestBuildDir = setupClusterTestBuildDir();
- }
+ TajoConf c = getConfiguration();
conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
- LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+ LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
catalogServer = new MiniCatalogServer(conf);
CatalogServer catServer = catalogServer.getCatalogServer();
InetSocketAddress sockAddr = catServer.getBindAddress();
c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
-
+ isCatalogServerRunning = true;
return this.catalogServer;
}
@@ -323,6 +312,7 @@ public class TajoTestingCluster {
if (catalogServer != null) {
this.catalogServer.shutdown();
}
+ isCatalogServerRunning = false;
}
public MiniCatalogServer getMiniCatalogCluster() {
@@ -352,10 +342,10 @@ public class TajoTestingCluster {
c.setVar(ConfVars.ROOT_DIR,
getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
} else {
- c.setVar(ConfVars.ROOT_DIR, clusterTestBuildDir.getAbsolutePath() + "/tajo");
+ c.setVar(ConfVars.ROOT_DIR, testBuildDir.getAbsolutePath() + "/tajo");
}
- setupCatalogForTesting(c, clusterTestBuildDir);
+ setupCatalogForTesting(c, testBuildDir);
tajoMaster = new TajoMaster();
tajoMaster.init(c);
@@ -374,6 +364,7 @@ public class TajoTestingCluster {
if(standbyWorkerMode) {
startTajoWorkers(numSlaves);
}
+ isTajoClusterRunning = true;
LOG.info("Mini Tajo cluster is up");
LOG.info("====================================================================================");
LOG.info("= MiniTajoCluster starts up =");
@@ -473,8 +464,8 @@ public class TajoTestingCluster {
/**
* @throws java.io.IOException If a cluster -- dfs or engine -- already running.
*/
- void isRunningCluster(String passedBuildPath) throws IOException {
- if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
+ void isRunningCluster() throws IOException {
+ if (!isTajoClusterRunning && !isCatalogServerRunning && !isDFSRunning) return;
throw new IOException("Cluster already running at " +
this.clusterTestBuildDir);
}
@@ -501,19 +492,13 @@ public class TajoTestingCluster {
LOG.info("Starting up minicluster with 1 master(s) and " +
numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
- // If we already put up a cluster, fail.
- String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
- isRunningCluster(testBuildPath);
- if (testBuildPath != null) {
- LOG.info("Using passed path: " + testBuildPath);
+ // If we already bring up the cluster, fail.
+ isRunningCluster();
+ if (clusterTestBuildDir != null) {
+ LOG.info("Using passed path: " + clusterTestBuildDir);
}
- // Make a new random dir to home everything in. Set it as system property.
- // minidfs reads home from system property.
- this.clusterTestBuildDir = testBuildPath == null?
- setupClusterTestBuildDir() : new File(testBuildPath);
-
- startMiniDFSCluster(numDataNodes, setupClusterTestBuildDir(), dataNodeHosts);
+ startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
this.dfsCluster.waitClusterUp();
hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
@@ -559,20 +544,11 @@ public class TajoTestingCluster {
}
public void startMiniClusterInLocal(final int numSlaves) throws Exception {
- // If we already put up a cluster, fail.
- String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
- isRunningCluster(testBuildPath);
- if (testBuildPath != null) {
- LOG.info("Using passed path: " + testBuildPath);
- }
-
- // Make a new random dir to home everything in. Set it as system property.
- // minidfs reads home from system property.
- this.clusterTestBuildDir = testBuildPath == null?
- setupClusterTestBuildDir() : new File(testBuildPath);
+ isRunningCluster();
- System.setProperty(TEST_DIRECTORY_KEY,
- this.clusterTestBuildDir.getAbsolutePath());
+ if (clusterTestBuildDir != null) {
+ LOG.info("Using passed path: " + clusterTestBuildDir);
+ }
startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
}
@@ -592,6 +568,7 @@ public class TajoTestingCluster {
if(this.catalogServer != null) {
shutdownCatalogCluster();
+ isCatalogServerRunning = false;
}
if(this.yarnCluster != null) {
@@ -612,6 +589,7 @@ public class TajoTestingCluster {
} catch (IOException e) {
System.err.println("error closing file system: " + e);
}
+ isDFSRunning = false;
}
if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
@@ -630,6 +608,7 @@ public class TajoTestingCluster {
}
LOG.info("Minicluster is down");
+ isTajoClusterRunning = false;
}
public static TajoClient newTajoClient() throws Exception {
@@ -803,7 +782,8 @@ public class TajoTestingCluster {
} catch (InterruptedException e) {
}
if (++i > 200) {
- throw new IOException("Timed out waiting");
+ throw new IOException("Timed out waiting. expected: " + expected +
+ ", actual: " + query != null ? String.valueOf(query.getSynchronizedState()) : String.valueOf(query));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index aff1677..e014b52 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -38,9 +38,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintWriter;
+import java.io.*;
import java.net.URL;
import static org.junit.Assert.*;
@@ -77,7 +75,8 @@ public class TestTajoCli {
}
@After
- public void tearDown() {
+ public void tearDown() throws IOException {
+ out.close();
if (tajoCli != null) {
tajoCli.close();
}
@@ -350,38 +349,27 @@ public class TestTajoCli {
assertOutputResult(new String(out.toByteArray()));
}
- @Test
+ @Test(timeout = 3000)
public void testNonForwardQueryPause() throws Exception {
final String sql = "select * from default.lineitem";
+ TajoCli cli = null;
try {
TableDesc tableDesc = cluster.getMaster().getCatalog().getTableDesc("default", "lineitem");
assertNotNull(tableDesc);
assertEquals(0L, tableDesc.getStats().getNumRows().longValue());
- setVar(tajoCli, SessionVars.CLI_PAGE_ROWS, "2");
- setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
- Thread t = new Thread() {
- public void run() {
- try {
- tajoCli.executeScript(sql);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
- };
- t.start();
+
+ InputStream testInput = new ByteArrayInputStream(new byte[]{(byte) DefaultTajoCliOutputFormatter.QUIT_COMMAND});
+ cli = new TajoCli(cluster.getConfiguration(), new String[]{}, testInput, out);
+ setVar(cli, SessionVars.CLI_PAGE_ROWS, "2");
+ setVar(cli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
+
+ cli.executeScript(sql);
+
String consoleResult;
- while (true) {
- Thread.sleep(3 * 1000);
- consoleResult = new String(out.toByteArray());
- if (consoleResult.indexOf("row") >= 0) {
- t.interrupt();
- break;
- }
- }
+ consoleResult = new String(out.toByteArray());
assertOutputResult(consoleResult);
} finally {
- setVar(tajoCli, SessionVars.CLI_PAGE_ROWS, "100");
+ cli.close();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index cc7dced..0799d22 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -660,12 +660,11 @@ public class TestInsertQuery extends QueryTestCaseBase {
@Test
public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
- String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+ String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery2");
ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
-
res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
res.close();
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 42ad8da..0574bea 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -127,12 +127,24 @@ public class TestKillQuery {
Query q = queryMasterTask.getQuery();
q.handle(new QueryEvent(queryId, QueryEventType.KILL));
- try{
+ try {
cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
- } finally {
assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (stage != null) {
+ System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)",
+ stage.getId().toString(),
+ stage.getTotalScheduledObjectsCount(),
+ stage.getCompletedTaskCount(),
+ stage.getSucceededObjectCount(),
+ stage.getKilledObjectCount(),
+ stage.getFailedObjectCount()));
+ }
+ throw e;
+ } finally {
+ queryMasterTask.stop();
}
- queryMasterTask.stop();
}
@Test
@@ -145,6 +157,8 @@ public class TestKillQuery {
QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
Query query = qmt.getQuery();
+ // wait for a stage created
+ cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
query.handle(new QueryEvent(queryId, QueryEventType.KILL));
try{
http://git-wip-us.apache.org/repos/asf/tajo/blob/fd73074f/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result b/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
index e9485d0..d4ba604 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testNonForwardQueryPause.result
@@ -2,4 +2,5 @@ l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice
-------------------------------
1, 1, 7706, 1, 17.0, 21168.23, 0.04, 0.02, N, O, 1996-03-13, 1996-02-12, 1996-03-22, DELIVER IN PERSON, TRUCK, egular courts above the
1, 1, 7311, 2, 36.0, 45983.16, 0.09, 0.06, N, O, 1996-04-12, 1996-02-28, 1996-04-20, TAKE BACK RETURN, MAIL, ly final dependencies: slyly bold
-(2 rows, continue... 'q' is quit)
\ No newline at end of file
+(2 rows, continue... 'q' is quit)
+(unknown row number, , 604 B selected)
\ No newline at end of file
[04/10] tajo git commit: TAJO-1306: HAServiceUtil should not directly
use HDFS.
Posted by ji...@apache.org.
TAJO-1306: HAServiceUtil should not directly use HDFS.
Closes #358
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4595375f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4595375f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4595375f
Branch: refs/heads/index_support
Commit: 4595375f7e6b62436e0d4bf88a8aef1ca680c726
Parents: 015913b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 28 09:23:20 2015 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 28 09:23:20 2015 -0800
----------------------------------------------------------------------
.../tajo/catalog/AbstractCatalogClient.java | 12 +-
.../apache/tajo/client/DummyServiceTracker.java | 84 +++
.../apache/tajo/client/SessionConnection.java | 44 +-
.../org/apache/tajo/client/TajoClientImpl.java | 38 +-
.../apache/tajo/client/TajoHAClientUtil.java | 14 +-
.../java/org/apache/tajo/conf/TajoConf.java | 7 +-
.../java/org/apache/tajo/ha/HAServiceUtil.java | 39 --
.../apache/tajo/service/BaseServiceTracker.java | 97 ++++
.../apache/tajo/service/HAServiceTracker.java | 48 ++
.../org/apache/tajo/service/ServiceTracker.java | 63 ++
.../tajo/service/ServiceTrackerException.java | 30 +
.../tajo/service/ServiceTrackerFactory.java | 41 ++
.../org/apache/tajo/service/TajoMasterInfo.java | 89 +++
.../org/apache/tajo/benchmark/BenchmarkSet.java | 15 +-
.../main/java/org/apache/tajo/ha/HAService.java | 56 --
.../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 ----------
.../org/apache/tajo/ha/HdfsServiceTracker.java | 576 +++++++++++++++++++
.../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ---
.../apache/tajo/master/TajoContainerProxy.java | 27 +-
.../java/org/apache/tajo/master/TajoMaster.java | 30 +-
.../apache/tajo/querymaster/QueryMaster.java | 66 +--
.../main/java/org/apache/tajo/util/JSPUtil.java | 12 +-
.../tajo/worker/TajoResourceAllocator.java | 28 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 46 +-
.../tajo/worker/TajoWorkerClientService.java | 2 +-
.../tajo/worker/WorkerHeartbeatService.java | 20 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 11 +-
.../resources/webapps/admin/catalogview.jsp | 5 +-
.../main/resources/webapps/admin/cluster.jsp | 7 +-
.../src/main/resources/webapps/admin/index.jsp | 7 +-
.../resources/webapps/admin/query_executor.jsp | 5 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 10 +-
.../org/apache/tajo/jdbc/JdbcConnection.java | 10 +-
.../org/apache/tajo/jdbc/TajoStatement.java | 4 +-
.../org/apache/tajo/storage/StorageUtil.java | 16 -
35 files changed, 1156 insertions(+), 808 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1a2fd44..718f7d6 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,12 +29,13 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
import java.net.InetSocketAddress;
@@ -48,6 +49,7 @@ import java.util.List;
public abstract class AbstractCatalogClient implements CatalogService {
private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+ protected ServiceTracker serviceTracker;
protected RpcConnectionPool pool;
protected InetSocketAddress catalogServerAddr;
protected TajoConf conf;
@@ -57,6 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
this.pool = RpcConnectionPool.getPool();
this.catalogServerAddr = catalogServerAddr;
+ this.serviceTracker = ServiceTrackerFactory.get(conf);
this.conf = conf;
}
@@ -64,14 +67,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
if (catalogServerAddr == null) {
return null;
} else {
+
if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
return catalogServerAddr;
} else {
- if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
- return HAServiceUtil.getCatalogAddress(conf);
- } else {
- return catalogServerAddr;
- }
+ return serviceTracker.getCatalogAddress();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
new file mode 100644
index 0000000..762c2e7
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class DummyServiceTracker implements ServiceTracker {
+ private InetSocketAddress address;
+
+ public DummyServiceTracker(InetSocketAddress address) {
+ this.address = address;
+ }
+
+ @Override
+ public boolean isHighAvailable() {
+ return false;
+ }
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ return address;
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void register() throws IOException {
+ }
+
+ @Override
+ public void delete() throws IOException {
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return true;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ throw new UnsupportedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 5490be4..3e2b9cc 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,12 +21,10 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -34,6 +32,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -58,8 +57,6 @@ public class SessionConnection implements Closeable {
private final TajoConf conf;
- final InetSocketAddress tajoMasterAddr;
-
final RpcConnectionPool connPool;
private final String baseDatabase;
@@ -73,41 +70,29 @@ public class SessionConnection implements Closeable {
/** session variable cache */
private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
-
- public SessionConnection(TajoConf conf) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
- }
-
- public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
- }
-
- public SessionConnection(InetSocketAddress addr) throws IOException {
- this(new TajoConf(), addr, null);
- }
-
- public SessionConnection(String hostname, int port, String baseDatabase) throws IOException {
- this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
- }
+ private ServiceTracker serviceTracker;
/**
* Connect to TajoMaster
*
* @param conf TajoConf
- * @param addr TajoMaster address
+ * @param tracker TajoMaster address
* @param baseDatabase The base database name. It is case sensitive. If it is null,
* the 'default' database will be used.
* @throws java.io.IOException
*/
- public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+ public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase)
+ throws IOException {
+
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
- this.tajoMasterAddr = addr;
int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
// Don't share connection pool per client
connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
+
+ this.serviceTracker = tracker;
}
public Map<String, String> getClientSideSessionVars() {
@@ -140,7 +125,8 @@ public class SessionConnection implements Closeable {
public boolean isConnected() {
if(!closed.get()){
try {
- return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+ return connPool.getConnection(serviceTracker.getClientServiceAddress(),
+ TajoMasterClientProtocol.class, false).isConnected();
} catch (Throwable e) {
return false;
}
@@ -309,15 +295,7 @@ public class SessionConnection implements Closeable {
}
protected InetSocketAddress getTajoMasterAddr() {
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return tajoMasterAddr;
- } else {
- if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
- return HAServiceUtil.getMasterClientAddress(conf);
- } else {
- return tajoMasterAddr;
- }
- }
+ return serviceTracker.getClientServiceAddress();
}
protected void checkSessionAndGet(NettyClientBase client) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 8eafc91..f8eef28 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -32,8 +32,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.jdbc.TajoResultSet;
@@ -41,7 +39,8 @@ import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -56,41 +55,30 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
QueryClient queryClient;
CatalogAdminClient catalogClient;
- public TajoClientImpl(TajoConf conf) throws IOException {
- this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null);
- }
-
- public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
- this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase);
- }
-
- public TajoClientImpl(InetSocketAddress addr) throws IOException {
- this(new TajoConf(), addr, null);
- }
-
/**
* Connect to TajoMaster
*
* @param conf TajoConf
- * @param addr TajoMaster address
+ * @param tracker ServiceTracker to discovery Tajo Client RPC
* @param baseDatabase The base database name. It is case sensitive. If it is null,
* the 'default' database will be used.
* @throws java.io.IOException
*/
- public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
- super(conf, addr, baseDatabase);
+ public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException {
+ super(conf, tracker, baseDatabase);
+
this.queryClient = new QueryClientImpl(this);
this.catalogClient = new CatalogAdminClientImpl(this);
-
+
diagnoseTajoClient();
}
- public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
- super(hostName, port, baseDatabase);
- this.queryClient = new QueryClientImpl(this);
- this.catalogClient = new CatalogAdminClientImpl(this);
-
- diagnoseTajoClient();
+ public TajoClientImpl(TajoConf conf) throws IOException {
+ this(conf, ServiceTrackerFactory.get(conf), null);
+ }
+
+ public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+ this(conf, ServiceTrackerFactory.get(conf), baseDatabase);
}
private void diagnoseTajoClient() throws EvaluationFailedException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index 12a9ec8..7267b10 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -40,10 +40,8 @@ import com.google.protobuf.ServiceException;
import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.NetUtils;
import java.io.IOException;
-import java.net.InetSocketAddress;
public class TajoHAClientUtil {
/**
@@ -65,6 +63,7 @@ public class TajoHAClientUtil {
TajoCliContext context) throws IOException, ServiceException {
if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
if (!HAServiceUtil.isMasterAlive(conf.getVar(
TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
TajoClient tajoClient = null;
@@ -85,15 +84,4 @@ public class TajoHAClientUtil {
return client;
}
}
-
-
- public static InetSocketAddress getRpcClientAddress(TajoConf conf) {
- if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf));
- } else {
- return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1bb96bc..fe5ff54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.ConfigKey;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.service.BaseServiceTracker;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.TUtil;
@@ -134,10 +135,14 @@ public class TajoConf extends Configuration {
Validators.networkAddr()),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
- // Tajo Master HA Configurations
+ // High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
+ // Service discovery
+ DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
+ HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
+
// Resource tracker service
RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
Validators.networkAddr()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
index 52c2ade..7001228 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
@@ -18,8 +18,6 @@
package org.apache.tajo.ha;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
@@ -34,15 +32,6 @@ import java.util.ArrayList;
import java.util.List;
public class HAServiceUtil {
- private static Log LOG = LogFactory.getLog(HAServiceUtil.class);
-
- public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
- }
-
- public static String getMasterUmbilicalName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
- }
public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
@@ -52,30 +41,6 @@ public class HAServiceUtil {
return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
}
- public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- }
-
- public static String getResourceTrackerName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
- }
-
- public static InetSocketAddress getCatalogAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS);
- }
-
- public static String getCatalogName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
- }
-
- public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS);
- }
-
- public static String getMasterInfoName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
- }
-
public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
InetSocketAddress masterAddress = null;
@@ -153,10 +118,6 @@ public class HAServiceUtil {
return masterAddress;
}
- public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
- return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
- }
-
public static boolean isMasterAlive(String masterName, TajoConf conf) {
boolean isAlive = true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
new file mode 100644
index 0000000..bf7fd2c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class BaseServiceTracker implements ServiceTracker {
+ private final TajoConf conf;
+ private TajoMasterInfo tajoMasterInfo;
+ private List<TajoMasterInfo> tajoMasterInfos;
+
+ @SuppressWarnings("unused")
+ public BaseServiceTracker(TajoConf conf) {
+ this.conf = conf;
+
+ tajoMasterInfo = new TajoMasterInfo();
+ tajoMasterInfo.setActive(true);
+ tajoMasterInfo.setAvailable(true);
+ tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+ tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS));
+ tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS));
+
+ tajoMasterInfos = TUtil.newList(tajoMasterInfo);
+ }
+
+ @Override
+ public boolean isHighAvailable() {
+ return false;
+ }
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ return tajoMasterInfo.getTajoMasterAddress();
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ return tajoMasterInfo.getTajoClientAddress();
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ return tajoMasterInfo.getWorkerResourceTrackerAddr();
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ return tajoMasterInfo.getCatalogAddress();
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ return tajoMasterInfo.getWebServerAddress();
+ }
+
+ @Override
+ public void register() throws IOException {
+ }
+
+ @Override
+ public void delete() throws IOException {
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return true;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ return tajoMasterInfos;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
new file mode 100644
index 0000000..c808537
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public abstract class HAServiceTracker implements ServiceTracker {
+
+ static SocketFactory socketFactory = SocketFactory.getDefault();
+
+ public boolean isHighAvailable() {
+ return true;
+ }
+
+ public static boolean checkConnection(InetSocketAddress address) {
+ boolean isAlive = true;
+
+ try {
+ int connectionTimeout = 10;
+
+ Socket socket = socketFactory.createSocket();
+ NetUtils.connect(socket, address, connectionTimeout);
+ } catch (Exception e) {
+ isAlive = false;
+ }
+ return isAlive;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
new file mode 100644
index 0000000..73ff112
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public interface ServiceTracker {
+
+ public abstract boolean isHighAvailable();
+
+ public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
+
+ /**
+ * Add master name to shared storage.
+ */
+ public void register() throws IOException;
+
+
+ /**
+ * Delete master name to shared storage.
+ *
+ */
+ public void delete() throws IOException;
+
+ /**
+ *
+ * @return True if current master is an active master.
+ */
+ public boolean isActiveStatus();
+
+ /**
+ *
+ * @return return all master list
+ * @throws IOException
+ */
+ public List<TajoMasterInfo> getMasters() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
new file mode 100644
index 0000000..3407c51
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+public class ServiceTrackerException extends RuntimeException {
+
+ public ServiceTrackerException(Throwable t) {
+ super(t);
+ }
+
+ public ServiceTrackerException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
new file mode 100644
index 0000000..5828055
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.ReflectionUtil;
+
+public class ServiceTrackerFactory {
+
+ public static ServiceTracker get(TajoConf conf) {
+ Class<ServiceTracker> trackerClass;
+
+ try {
+ if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.HA_SERVICE_TRACKER_CLASS);
+ } else {
+ trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.DEFAULT_SERVICE_TRACKER_CLASS);
+ }
+ return ReflectionUtil.newInstance(trackerClass, conf);
+
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
new file mode 100644
index 0000000..481b528
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+ private boolean available;
+ private boolean isActive;
+
+ private InetSocketAddress tajoMasterAddress;
+ private InetSocketAddress tajoClientAddress;
+ private InetSocketAddress workerResourceTrackerAddr;
+ private InetSocketAddress catalogAddress;
+ private InetSocketAddress webServerAddress;
+
+ public InetSocketAddress getTajoMasterAddress() {
+ return tajoMasterAddress;
+ }
+
+ public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+ this.tajoMasterAddress = tajoMasterAddress;
+ }
+
+ public InetSocketAddress getTajoClientAddress() {
+ return tajoClientAddress;
+ }
+
+ public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+ this.tajoClientAddress = tajoClientAddress;
+ }
+
+ public InetSocketAddress getWorkerResourceTrackerAddr() {
+ return workerResourceTrackerAddr;
+ }
+
+ public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+ this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+ }
+
+ public InetSocketAddress getCatalogAddress() {
+ return catalogAddress;
+ }
+
+ public void setCatalogAddress(InetSocketAddress catalogAddress) {
+ this.catalogAddress = catalogAddress;
+ }
+
+ public InetSocketAddress getWebServerAddress() {
+ return webServerAddress;
+ }
+
+ public void setWebServerAddress(InetSocketAddress webServerAddress) {
+ this.webServerAddress = webServerAddress;
+ }
+
+ public boolean isAvailable() {
+ return available;
+ }
+
+ public void setAvailable(boolean available) {
+ this.available = available;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void setActive(boolean active) {
+ isActive = active;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b1b6450..0304e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -18,19 +18,23 @@
package org.apache.tajo.benchmark;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.store.MemStore;
+import org.apache.tajo.client.DummyServiceTracker;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.FileUtil;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -43,9 +47,14 @@ public abstract class BenchmarkSet {
public void init(TajoConf conf, String dataDir) throws IOException {
this.dataDir = dataDir;
- if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
- tajo = new TajoClientImpl(NetUtils.createSocketAddr(
- System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
+
+ if (System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname) != null) {
+
+ String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(addressStr);
+ ServiceTracker serviceTracker = new DummyServiceTracker(addr);
+ tajo = new TajoClientImpl(conf, serviceTracker, null);
+
} else {
conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
tajo = new TajoClientImpl(conf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
deleted file mode 100644
index 1329223..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
- /**
- * Add master name to shared storage.
- */
- public void register() throws IOException;
-
-
- /**
- * Delete master name to shared storage.
- *
- */
- public void delete() throws IOException;
-
- /**
- *
- * @return True if current master is an active master.
- */
- public boolean isActiveStatus();
-
- /**
- *
- * @return return all master list
- * @throws IOException
- */
- public List<TajoMasterInfo> getMasters() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index e18a9b2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
- private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
- private MasterContext context;
- private TajoConf conf;
-
- private FileSystem fs;
-
- private String masterName;
- private Path rootPath;
- private Path haPath;
- private Path activePath;
- private Path backupPath;
-
- private boolean isActiveStatus = false;
-
- //thread which runs periodically to see the last time since a heartbeat is received.
- private Thread checkerThread;
- private volatile boolean stopped = false;
-
- private int monitorInterval;
-
- private String currentActiveMaster;
-
- public HAServiceHDFSImpl(MasterContext context) throws IOException {
- this.context = context;
- this.conf = context.getConf();
- initSystemDirectory();
-
- InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
- this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
- monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
- }
-
- private void initSystemDirectory() throws IOException {
- // Get Tajo root dir
- this.rootPath = TajoConf.getTajoRootDir(conf);
-
- // Check Tajo root dir
- this.fs = rootPath.getFileSystem(conf);
-
- // Check and create Tajo system HA dir
- haPath = TajoConf.getSystemHADir(conf);
- if (!fs.exists(haPath)) {
- fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA dir '" + haPath + "' is created");
- }
-
- activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- if (!fs.exists(activePath)) {
- fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Active dir '" + activePath + "' is created");
- }
-
- backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
- if (!fs.exists(backupPath)) {
- fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Backup dir '" + backupPath + "' is created");
- }
- }
-
- private void startPingChecker() {
- if (checkerThread == null) {
- checkerThread = new Thread(new PingChecker());
- checkerThread.setName("Ping Checker");
- checkerThread.start();
- }
- }
-
- @Override
- public void register() throws IOException {
- FileStatus[] files = fs.listStatus(activePath);
-
- // Phase 1: If there is not another active master, this try to become active master.
- if (files.length == 0) {
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 2: If there is active master information, we need to check its status.
- Path activePath = files[0].getPath();
- currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
- // Phase 3: If current active master is dead, this master should be active master.
- if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
- fs.delete(activePath, true);
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 4: If current active master is alive, this master need to be backup master.
- createMasterFile(false);
- LOG.info(String.format("This is added to backup masters (%s)", masterName));
- }
- }
- }
-
- private void createMasterFile(boolean isActive) throws IOException {
- String fileName = masterName.replaceAll(":", "_");
- Path path = null;
-
- if (isActive) {
- path = new Path(activePath, fileName);
- } else {
- path = new Path(backupPath, fileName);
- }
-
- StringBuilder sb = new StringBuilder();
- InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.CATALOG_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
- FSDataOutputStream out = fs.create(path);
-
- try {
- out.writeUTF(sb.toString());
- out.hflush();
- out.close();
- } catch (FileAlreadyExistsException e) {
- createMasterFile(false);
- }
-
- if (isActive) {
- isActiveStatus = true;
- } else {
- isActiveStatus = false;
- }
-
- startPingChecker();
- }
-
-
- private InetSocketAddress getHostAddress(int type) {
- InetSocketAddress address = null;
-
- switch (type) {
- case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- break;
- case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS);
- break;
- case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS);
- break;
- case HAConstants.CATALOG_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .CATALOG_ADDRESS);
- break;
- case HAConstants.MASTER_INFO_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_INFO_ADDRESS);
- default:
- break;
- }
-
- return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
- }
-
- @Override
- public void delete() throws IOException {
- String fileName = masterName.replaceAll(":", "_");
-
- Path activeFile = new Path(activePath, fileName);
- if (fs.exists(activeFile)) {
- fs.delete(activeFile, true);
- }
-
- Path backupFile = new Path(backupPath, fileName);
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true);
- }
- if (isActiveStatus) {
- isActiveStatus = false;
- }
- stopped = true;
- }
-
- @Override
- public boolean isActiveStatus() {
- return isActiveStatus;
- }
-
- @Override
- public List<TajoMasterInfo> getMasters() throws IOException {
- List<TajoMasterInfo> list = TUtil.newList();
- Path path = null;
-
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 1) {
- path = files[0].getPath();
- list.add(createTajoMasterInfo(path, true));
- }
-
- files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- path = status.getPath();
- list.add(createTajoMasterInfo(path, false));
- }
-
- return list;
- }
-
- private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
- String masterAddress = path.getName().replaceAll("_", ":");
- boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
- FSDataInputStream stream = fs.open(path);
- String data = stream.readUTF();
-
- stream.close();
-
- String[] addresses = data.split("_");
- TajoMasterInfo info = new TajoMasterInfo();
-
- info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
- info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
- info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
- info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
- info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
- info.setAvailable(isAlive);
- info.setActive(isActive);
-
- return info;
- }
-
- private class PingChecker implements Runnable {
- @Override
- public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- synchronized (HAServiceHDFSImpl.this) {
- try {
- if (!currentActiveMaster.equals(masterName)) {
- boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
- + ", isAlive:" + isAlive);
- }
-
- // If active master is dead, this master should be active master instead of
- // previous active master.
- if (!isAlive) {
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 0 || (files.length == 1
- && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
- delete();
- register();
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(monitorInterval);
- } catch (InterruptedException e) {
- LOG.info("PingChecker interrupted. - masterName:" + masterName);
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
new file mode 100644
index 0000000..1475a5d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.HAServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.TUtil;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+@SuppressWarnings("unused")
+public class HdfsServiceTracker extends HAServiceTracker {
+ private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class);
+
+ private TajoConf conf;
+
+ private FileSystem fs;
+
+ private String masterName;
+ private Path rootPath;
+ private Path haPath;
+ private Path activePath;
+ private Path backupPath;
+
+ private boolean isActiveStatus = false;
+
+ //thread which runs periodically to see the last time since a heartbeat is received.
+ private Thread checkerThread;
+ private volatile boolean stopped = false;
+
+ private int monitorInterval;
+
+ private String currentActiveMaster;
+
+ public HdfsServiceTracker(TajoConf conf) throws IOException {
+ this.conf = conf;
+ initSystemDirectory();
+
+ InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+ monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+ }
+
+ private void initSystemDirectory() throws IOException {
+ // Get Tajo root dir
+ this.rootPath = TajoConf.getTajoRootDir(conf);
+
+ // Check Tajo root dir
+ this.fs = rootPath.getFileSystem(conf);
+
+ // Check and create Tajo system HA dir
+ haPath = TajoConf.getSystemHADir(conf);
+ if (!fs.exists(haPath)) {
+ fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA dir '" + haPath + "' is created");
+ }
+
+ activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ if (!fs.exists(activePath)) {
+ fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Active dir '" + activePath + "' is created");
+ }
+
+ backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ if (!fs.exists(backupPath)) {
+ fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Backup dir '" + backupPath + "' is created");
+ }
+ }
+
+ private void startPingChecker() {
+ if (checkerThread == null) {
+ checkerThread = new Thread(new PingChecker());
+ checkerThread.setName("Ping Checker");
+ checkerThread.start();
+ }
+ }
+
+ @Override
+ public void register() throws IOException {
+ FileStatus[] files = fs.listStatus(activePath);
+
+ // Phase 1: If there is not another active master, this try to become active master.
+ if (files.length == 0) {
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 2: If there is active master information, we need to check its status.
+ Path activePath = files[0].getPath();
+ currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+ // Phase 3: If current active master is dead, this master should be active master.
+ if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+ fs.delete(activePath, true);
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 4: If current active master is alive, this master need to be backup master.
+ createMasterFile(false);
+ LOG.info(String.format("This is added to backup masters (%s)", masterName));
+ }
+ }
+ }
+
+ /**
+ * It will creates the following form string. It includes
+ *
+ * <pre>
+ * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
+ * </pre>
+ *
+ * @param isActive A boolean flag to indicate if it is for master or not.
+ * @throws IOException
+ */
+ private void createMasterFile(boolean isActive) throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+ Path path = null;
+
+ if (isActive) {
+ path = new Path(activePath, fileName);
+ } else {
+ path = new Path(backupPath, fileName);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+ FSDataOutputStream out = fs.create(path);
+
+ try {
+ out.writeUTF(sb.toString());
+ out.hsync();
+ out.close();
+ } catch (FileAlreadyExistsException e) {
+ createMasterFile(false);
+ }
+
+ if (isActive) {
+ isActiveStatus = true;
+ } else {
+ isActiveStatus = false;
+ }
+
+ startPingChecker();
+ }
+
+
+ private InetSocketAddress getHostAddress(int type) {
+ InetSocketAddress address = null;
+
+ switch (type) {
+ case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ break;
+ case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+ break;
+ case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+ break;
+ case HAConstants.CATALOG_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
+ break;
+ case HAConstants.MASTER_INFO_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+ default:
+ break;
+ }
+
+ return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+ }
+
+ @Override
+ public void delete() throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+
+ Path activeFile = new Path(activePath, fileName);
+ if (fs.exists(activeFile)) {
+ fs.delete(activeFile, true);
+ }
+
+ Path backupFile = new Path(backupPath, fileName);
+ if (fs.exists(backupFile)) {
+ fs.delete(backupFile, true);
+ }
+ if (isActiveStatus) {
+ isActiveStatus = false;
+ }
+ stopped = true;
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return isActiveStatus;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ List<TajoMasterInfo> list = TUtil.newList();
+ Path path = null;
+
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ path = files[0].getPath();
+ list.add(createTajoMasterInfo(path, true));
+ }
+
+ files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ path = status.getPath();
+ list.add(createTajoMasterInfo(path, false));
+ }
+
+ return list;
+ }
+
+ private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+ String masterAddress = path.getName().replaceAll("_", ":");
+ boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+ FSDataInputStream stream = fs.open(path);
+ String data = stream.readUTF();
+
+ stream.close();
+
+ String[] addresses = data.split("_");
+ TajoMasterInfo info = new TajoMasterInfo();
+
+ info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+ info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+ info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+ info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+ info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+ info.setAvailable(isAlive);
+ info.setActive(isActive);
+
+ return info;
+ }
+
+ private class PingChecker implements Runnable {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (HdfsServiceTracker.this) {
+ try {
+ if (!currentActiveMaster.equals(masterName)) {
+ boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+ + ", isAlive:" + isAlive);
+ }
+
+ // If active master is dead, this master should be active master instead of
+ // previous active master.
+ if (!isAlive) {
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 0 || (files.length == 1
+ && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+ delete();
+ register();
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(monitorInterval);
+ } catch (InterruptedException e) {
+ LOG.info("PingChecker interrupted. - masterName:" + masterName);
+ break;
+ }
+ }
+ }
+ }
+
+ private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0;
+ private final static int MASTER_CLIENT_RPC_ADDRESS = 1;
+ private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2;
+ private final static int CATALOG_ADDRESS = 3;
+ private final static int MASTER_HTTP_INFO = 4;
+
+ private volatile InetSocketAddress umbilicalRpcAddr;
+ private volatile InetSocketAddress clientRpcAddr;
+ private volatile InetSocketAddress resourceTrackerRpcAddr;
+ private volatile InetSocketAddress catalogAddr;
+ private volatile InetSocketAddress masterHttpInfoAddr;
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ if (!checkConnection(umbilicalRpcAddr)) {
+ umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS));
+ }
+
+ return umbilicalRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ if (!checkConnection(clientRpcAddr)) {
+ clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS));
+ }
+
+ return clientRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ if (!checkConnection(resourceTrackerRpcAddr)) {
+ resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS));
+ }
+
+ return resourceTrackerRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ if (!checkConnection(catalogAddr)) {
+ catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS));
+ }
+
+ return catalogAddr;
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ if (!checkConnection(masterHttpInfoAddr)) {
+ masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO));
+ }
+
+ return masterHttpInfoAddr;
+ }
+
+ /**
+ * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. *
+ *
+ * @param conf
+ * @return all service addresses
+ * @throws ServiceTrackerException
+ */
+ private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException {
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+ if (!fs.exists(activeMasterBaseDir)) {
+ throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir);
+ }
+ if (!fs.isDirectory(activeMasterBaseDir)) {
+ throw new ServiceTrackerException("Active master base path must be a directory.");
+ }
+
+ FileStatus[] files = fs.listStatus(activeMasterBaseDir);
+
+ if (files.length < 1) {
+ throw new ServiceTrackerException("No active master entry");
+ } else if (files.length > 1) {
+ throw new ServiceTrackerException("Two or more than active master entries.");
+ }
+
+ // We can ensure that there is only one file due to the above assertion.
+ Path activeMasterEntry = files[0].getPath();
+
+ if (!fs.isFile(activeMasterEntry)) {
+ throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
+ }
+
+ List<String> addressElements = TUtil.newList();
+
+ addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements
+
+ FSDataInputStream stream = fs.open(activeMasterEntry);
+ String data = stream.readUTF();
+ stream.close();
+
+ addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
+
+ // ensure the number of entries
+ Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary.");
+
+ return addressElements;
+
+ } catch (Throwable t) {
+ throw new ServiceTrackerException(t);
+ }
+ }
+
+
+ public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+ return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+ }
+
+ public static boolean isMasterAlive(String masterName, TajoConf conf) {
+ boolean isAlive = true;
+
+ try {
+ // how to create sockets
+ SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+ int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+ InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+ // connected socket
+ Socket socket = socketFactory.createSocket();
+ org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+ } catch (Exception e) {
+ isAlive = false;
+ }
+ return isAlive;
+ }
+
+ public static int getState(String masterName, TajoConf conf) {
+ String targetMaster = masterName.replaceAll(":", "_");
+ int retValue = -1;
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 0;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 1;
+ }
+ }
+ retValue = -2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+ public static int formatHA(TajoConf conf) {
+ int retValue = -1;
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ int aliveMasterCount = 0;
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // If there is any alive master, users can't format storage.
+ if (aliveMasterCount > 0) {
+ return 0;
+ }
+
+ // delete ha path.
+ fs.delete(TajoConf.getSystemHADir(conf), true);
+ retValue = 1;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+
+ public static List<String> getMasters(TajoConf conf) {
+ List<String> list = new ArrayList<String>();
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return list;
+ }
+
+ private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+ Path rootPath = TajoConf.getTajoRootDir(conf);
+ return rootPath.getFileSystem(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
deleted file mode 100644
index c6fdd40..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
- private boolean available;
- private boolean isActive;
-
- private InetSocketAddress tajoMasterAddress;
- private InetSocketAddress tajoClientAddress;
- private InetSocketAddress workerResourceTrackerAddr;
- private InetSocketAddress catalogAddress;
- private InetSocketAddress webServerAddress;
-
- public InetSocketAddress getTajoMasterAddress() {
- return tajoMasterAddress;
- }
-
- public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
- this.tajoMasterAddress = tajoMasterAddress;
- }
-
- public InetSocketAddress getTajoClientAddress() {
- return tajoClientAddress;
- }
-
- public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
- this.tajoClientAddress = tajoClientAddress;
- }
-
- public InetSocketAddress getWorkerResourceTrackerAddr() {
- return workerResourceTrackerAddr;
- }
-
- public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
- this.workerResourceTrackerAddr = workerResourceTrackerAddr;
- }
-
- public InetSocketAddress getCatalogAddress() {
- return catalogAddress;
- }
-
- public void setCatalogAddress(InetSocketAddress catalogAddress) {
- this.catalogAddress = catalogAddress;
- }
-
- public InetSocketAddress getWebServerAddress() {
- return webServerAddress;
- }
-
- public void setWebServerAddress(InetSocketAddress webServerAddress) {
- this.webServerAddress = webServerAddress;
- }
-
- public boolean isAvailable() {
- return available;
- }
-
- public void setAvailable(boolean available) {
- this.available = available;
- }
-
- public boolean isActive() {
- return isActive;
- }
-
- public void setActive(boolean active) {
- isActive = active;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 42ffd87..996d356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -38,6 +38,8 @@ import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.worker.TajoWorker;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -45,6 +47,7 @@ import java.util.List;
public class TajoContainerProxy extends ContainerProxy {
private final QueryContext queryContext;
+ private final TajoWorker.WorkerContext workerContext;
private final String planJson;
public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
@@ -52,6 +55,7 @@ public class TajoContainerProxy extends ContainerProxy {
QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
super(context, conf, executionBlockId, container);
this.queryContext = queryContext;
+ this.workerContext = context.getQueryMasterContext().getWorkerContext();
this.planJson = planJson;
}
@@ -171,27 +175,8 @@ public class TajoContainerProxy extends ContainerProxy {
RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- TajoConf conf = context.getConf();
- if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(conf));
- context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(conf));
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
+ ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
+ tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 786025a..a11606f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -40,18 +40,18 @@ import org.apache.tajo.catalog.LocalCatalogWrapper;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.ha.HAService;
-import org.apache.tajo.ha.HAServiceHDFSImpl;
-import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
-import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.SessionManager;
+import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.session.SessionManager;
import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.*;
@@ -127,7 +127,7 @@ public class TajoMaster extends CompositeService {
private TajoSystemMetrics systemMetrics;
- private HAService haService;
+ private ServiceTracker haService;
private JvmPauseMonitor pauseMonitor;
@@ -226,15 +226,6 @@ public class TajoMaster extends CompositeService {
}
}
-
- private void initHAManger() throws Exception {
- // If tajo provides haService based on ZooKeeper, following codes need to update.
- if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
- haService = new HAServiceHDFSImpl(context);
- haService.register();
- }
- }
-
public boolean isActiveMaster() {
return (haService != null ? haService.isActiveStatus() : true);
}
@@ -326,11 +317,8 @@ public class TajoMaster extends CompositeService {
initSystemMetrics();
- try {
- initHAManger();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
+ haService = ServiceTrackerFactory.get(systemConf);
+ haService.register();
historyWriter = new HistoryWriter(getMasterName(), true);
historyWriter.init(getConfig());
@@ -477,7 +465,7 @@ public class TajoMaster extends CompositeService {
return systemMetrics;
}
- public HAService getHAService() {
+ public ServiceTracker getHAService() {
return haService;
}
[02/10] tajo git commit: TAJO-1320: HBaseStorageManager need to
support Zookeeper Client Port. (missing file)
Posted by ji...@apache.org.
TAJO-1320: HBaseStorageManager need to support Zookeeper Client Port. (missing file)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/015913b7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/015913b7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/015913b7
Branch: refs/heads/index_support
Commit: 015913b7a7eec922475a67d9455457f229367af7
Parents: b9719ba
Author: JaeHwa Jung <bl...@apache.org>
Authored: Wed Jan 28 14:31:34 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Wed Jan 28 14:31:34 2015 +0900
----------------------------------------------------------------------
tajo-docs/src/main/sphinx/hbase_integration.rst | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/015913b7/tajo-docs/src/main/sphinx/hbase_integration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/hbase_integration.rst b/tajo-docs/src/main/sphinx/hbase_integration.rst
index 73ef6d1..779223b 100644
--- a/tajo-docs/src/main/sphinx/hbase_integration.rst
+++ b/tajo-docs/src/main/sphinx/hbase_integration.rst
@@ -29,14 +29,16 @@ CREATE TABLE
USING hbase
WITH ('table'='<hbase_table_name>'
, 'columns'=':key,<column_family_name>:<qualifier_name>, ...'
- , 'hbase.zookeeper.quorum'='<zookeeper_address>')
+ , 'hbase.zookeeper.quorum'='<zookeeper_address>'
+ , 'hbase.zookeeper.property.clientPort'='<zookeeper_client_port>'
+ )
Options
* ``table`` : Set hbase origin table name. If you want to create an external table, the table must exists on HBase. The other way, if you want to create a managed table, the table must doesn't exist on HBase.
* ``columns`` : :key means HBase row key. The number of columns entry need to equals to the number of Tajo table column
* ``hbase.zookeeper.quorum`` : Set zookeeper quorum address. You can use different zookeeper cluster on the same Tajo database. If you don't set the zookeeper address, Tajo will refer the property of hbase-site.xml file.
-
+* ``hbase.zookeeper.property.clientPort`` : Set zookeeper client port. If you don't set the port, Tajo will refer the property of hbase-site.xml file.
``IF NOT EXISTS`` allows ``CREATE [EXTERNAL] TABLE`` statement to avoid an error which occurs when the table does not exist.
[09/10] tajo git commit: TAJO-1321: Cli prints wrong response time.
(jihoon)
Posted by ji...@apache.org.
TAJO-1321: Cli prints wrong response time. (jihoon)
Closes #369
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/58bbb1bb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/58bbb1bb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/58bbb1bb
Branch: refs/heads/index_support
Commit: 58bbb1bb424288fdb3e731374de9938911d8e0f3
Parents: 1e00759
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Feb 2 21:26:40 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Feb 2 21:27:01 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
.../main/java/org/apache/tajo/master/QueryInProgress.java | 2 +-
.../src/main/java/org/apache/tajo/master/QueryManager.java | 4 ----
.../java/org/apache/tajo/master/TajoMasterClientService.java | 8 ++------
.../main/java/org/apache/tajo/querymaster/QueryMaster.java | 1 -
tajo-core/src/main/proto/QueryCoordinatorProtocol.proto | 1 -
6 files changed, 5 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 666ee9a..3667991 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.10.0 - unreleased
BUG FIXES
+ TAJO-1321: Cli prints wrong response time. (jihoon)
+
TAJO-1313: Tajo-dump creates DDLs for information_schema tables.
(jihun)
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index e7371dd..45bdc5a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -208,7 +208,6 @@ public class QueryInProgress {
this.queryInfo.setQueryState(queryInfo.getQueryState());
this.queryInfo.setProgress(queryInfo.getProgress());
- this.queryInfo.setFinishTime(queryInfo.getFinishTime());
// Update diagnosis message
if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
@@ -223,6 +222,7 @@ public class QueryInProgress {
if (isFinishState(this.queryInfo.getQueryState())) {
+ this.queryInfo.setFinishTime(System.currentTimeMillis());
masterContext.getQueryJobManager().getEventHandler().handle(
new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index bc6f07b..db895ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -320,10 +320,6 @@ public class QueryManager extends CompositeService {
queryInfo.setQueryState(queryHeartbeat.getState());
queryInfo.setProgress(queryHeartbeat.getQueryProgress());
- if (queryHeartbeat.hasQueryFinishTime()) {
- queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
- }
-
if (queryHeartbeat.hasResultDesc()) {
queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 16e4fea..6af3248 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -393,9 +393,7 @@ public class TajoMasterClientService extends AbstractService {
infoBuilder.setState(queryInfo.getQueryState());
infoBuilder.setQuery(queryInfo.getSql());
infoBuilder.setStartTime(queryInfo.getStartTime());
- long endTime = (queryInfo.getFinishTime() == 0) ?
- System.currentTimeMillis() : queryInfo.getFinishTime();
- infoBuilder.setFinishTime(endTime);
+ infoBuilder.setFinishTime(System.currentTimeMillis());
infoBuilder.setProgress(queryInfo.getProgress());
infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
@@ -428,9 +426,7 @@ public class TajoMasterClientService extends AbstractService {
infoBuilder.setState(queryInfo.getQueryState());
infoBuilder.setQuery(queryInfo.getSql());
infoBuilder.setStartTime(queryInfo.getStartTime());
- long endTime = (queryInfo.getFinishTime() == 0) ?
- System.currentTimeMillis() : queryInfo.getFinishTime();
- infoBuilder.setFinishTime(endTime);
+ infoBuilder.setFinishTime(queryInfo.getFinishTime());
infoBuilder.setProgress(queryInfo.getProgress());
infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index a30df54..234a46a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -391,7 +391,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
}
builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
- builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/58bbb1bb/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
index 41a382f..cef385e 100644
--- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -69,7 +69,6 @@ message TajoHeartbeat {
optional TableDescProto resultDesc = 4;
optional string statusMessage = 5;
optional float queryProgress = 6;
- optional int64 queryFinishTime = 7;
}
message TajoHeartbeatResponse {