You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2014/08/19 16:48:03 UTC
[2/2] git commit: TAJO-704: TajoMaster HA (jaehwa)
TAJO-704: TajoMaster HA (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b16d13ad
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b16d13ad
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b16d13ad
Branch: refs/heads/master
Commit: b16d13addacc8c03f7b46b912a02231cea4c0861
Parents: 35c2492
Author: Jaehwa Jung <bl...@apache.org>
Authored: Tue Aug 19 23:44:21 2014 +0900
Committer: Jaehwa Jung <bl...@apache.org>
Committed: Tue Aug 19 23:44:21 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 75 +++--
.../org/apache/tajo/catalog/CatalogServer.java | 2 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 24 +-
.../org/apache/tajo/cli/TajoHAAdminCommand.java | 57 ++++
.../java/org/apache/tajo/client/TajoAdmin.java | 27 +-
.../java/org/apache/tajo/client/TajoClient.java | 78 +++--
.../org/apache/tajo/client/TajoGetConf.java | 4 +-
.../org/apache/tajo/client/TajoHAAdmin.java | 210 +++++++++++++
.../apache/tajo/client/TajoHAClientUtil.java | 87 ++++++
.../java/org/apache/tajo/TajoConstants.java | 5 +
.../java/org/apache/tajo/conf/TajoConf.java | 8 +
.../org/apache/tajo/util/HAServiceUtil.java | 293 +++++++++++++++++++
.../apache/tajo/master/TajoContainerProxy.java | 24 +-
.../java/org/apache/tajo/master/TajoMaster.java | 36 +++
.../org/apache/tajo/master/ha/HAService.java | 56 ++++
.../tajo/master/ha/HAServiceHDFSImpl.java | 278 ++++++++++++++++++
.../apache/tajo/master/ha/TajoMasterInfo.java | 89 ++++++
.../tajo/master/querymaster/QueryMaster.java | 91 +++++-
.../master/querymaster/QueryMasterTask.java | 24 +-
.../tajo/webapp/QueryExecutorServlet.java | 5 +
.../tajo/worker/TajoResourceAllocator.java | 29 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 36 ++-
.../tajo/worker/WorkerHeartbeatService.java | 18 +-
.../resources/webapps/admin/catalogview.jsp | 13 +-
.../main/resources/webapps/admin/cluster.jsp | 87 +++++-
.../src/main/resources/webapps/admin/index.jsp | 47 ++-
.../src/main/resources/webapps/admin/query.jsp | 13 +-
.../resources/webapps/admin/query_executor.jsp | 13 +-
.../org/apache/tajo/TajoTestingCluster.java | 7 +-
.../tajo/master/ha/TestHAServiceHDFSImpl.java | 133 +++++++++
tajo-dist/src/main/bin/start-tajo.sh | 15 +-
tajo-dist/src/main/bin/stop-tajo.sh | 16 +-
tajo-dist/src/main/bin/tajo | 5 +
tajo-dist/src/main/bin/tajo-daemons.sh | 1 +
tajo-docs/src/main/sphinx/configuration.rst | 3 +-
.../sphinx/configuration/ha_configuration.rst | 135 +++++++++
37 files changed, 1937 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 841eb49..b605c71 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.9.0 - unreleased
NEW FEATURES
+ TAJO-704: TajoMaster HA (jaehwa)
+
TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)
TAJO-774: Implement logical plan part and physical executor for window
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 542214b..1f1e808 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -33,6 +33,7 @@ import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.ProtoUtil;
import java.net.InetSocketAddress;
@@ -58,10 +59,26 @@ public abstract class AbstractCatalogClient implements CatalogService {
this.conf = conf;
}
+ private InetSocketAddress getCatalogServerAddr() {
+ if (catalogServerAddr == null) {
+ return null;
+ } else {
+ if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ return catalogServerAddr;
+ } else {
+ if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
+ return HAServiceUtil.getCatalogAddress(conf);
+ } else {
+ return catalogServerAddr;
+ }
+ }
+ }
+ }
+
@Override
public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -80,7 +97,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -95,7 +112,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -110,7 +127,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllTablespaceNames() {
try {
- return new ServerCallable<Collection<String>>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Collection<String> call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
@@ -126,7 +143,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public TablespaceProto getTablespace(final String tablespaceName) {
try {
- return new ServerCallable<TablespaceProto>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<TablespaceProto>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public TablespaceProto call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
@@ -141,7 +158,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.alterTablespace(null, alterTablespace).getValue();
@@ -156,7 +173,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -177,7 +194,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -192,7 +209,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -207,7 +224,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllDatabaseNames() {
try {
- return new ServerCallable<Collection<String>>(pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Collection<String> call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
@@ -223,7 +240,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final TableDesc getTableDesc(final String databaseName, final String tableName) {
try {
- return new ServerCallable<TableDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<TableDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public TableDesc call(NettyClientBase client) throws ServiceException {
TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
builder.setDatabaseName(databaseName);
@@ -248,7 +265,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -268,7 +285,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -288,7 +305,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllTableNames(final String databaseName) {
try {
- return new ServerCallable<Collection<String>>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Collection<String>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Collection<String> call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
@@ -304,7 +321,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<FunctionDesc> getFunctions() {
try {
- return new ServerCallable<Collection<FunctionDesc>>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Collection<FunctionDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
List<FunctionDesc> list = new ArrayList<FunctionDesc>();
GetFunctionsResponse response;
@@ -331,7 +348,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean createTable(final TableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.createTable(null, desc.getProto()).getValue();
@@ -350,7 +367,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String simpleName = splitted[1];
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -374,7 +391,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
"tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
}
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -399,7 +416,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean createIndex(final IndexDesc index) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.createIndex(null, index.getProto()).getValue();
@@ -414,7 +431,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
IndexNameProto.Builder builder = IndexNameProto.newBuilder();
builder.setDatabaseName(databaseName);
@@ -433,7 +450,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -453,7 +470,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public IndexDesc call(NettyClientBase client) throws ServiceException {
IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -475,7 +492,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String tableName,
final String columnName) {
try {
- return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public IndexDesc call(NettyClientBase client) throws ServiceException {
GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -496,7 +513,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
public boolean dropIndex(final String databaseName,
final String indexName) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -516,7 +533,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.createFunction(null, funcDesc.getProto()).getValue();
@@ -531,7 +548,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean dropFunction(final String signature) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
builder.setSignature(signature);
@@ -564,7 +581,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
FunctionDescProto descProto = null;
try {
- descProto = new ServerCallable<FunctionDescProto>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ descProto = new ServerCallable<FunctionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public FunctionDescProto call(NettyClientBase client) throws ServiceException {
try {
CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -614,7 +631,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.containFunction(null, builder.build()).getValue();
@@ -629,7 +646,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean alterTable(final AlterTableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
return stub.alterTable(null, desc.getProto()).getValue();
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 4e391aa..cf3ea6f 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -887,7 +887,7 @@ public class CatalogServer extends AbstractService {
} else {
for (FunctionDescProto existing : functions.get(signature)) {
if (existing.getParameterTypesList() != null &&
- CatalogUtil.isMatchedFunction(existing.getParameterTypesList(), params)) {
+ CatalogUtil.isMatchedFunction(existing.getParameterTypesList(), params)) {
return existing;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index fdc766e..2f9e0b2 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -28,10 +28,12 @@ import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoHAClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.HAServiceUtil;
import java.io.*;
import java.lang.reflect.Constructor;
@@ -81,7 +83,8 @@ public class TajoCli {
ExecExternalShellCommand.class,
HdfsCommand.class,
TajoAdminCommand.class,
- TajoGetConfCommand.class
+ TajoGetConfCommand.class,
+ TajoHAAdminCommand.class
};
private final Map<String, TajoShellCommand> commands = new TreeMap<String, TajoShellCommand>();
@@ -226,6 +229,7 @@ public class TajoCli {
client = new TajoClient(conf, baseDatabase);
}
+ checkMasterStatus();
context.setCurrentDatabase(client.getCurrentDatabase());
initHistory();
initCommands();
@@ -419,6 +423,7 @@ public class TajoCli {
}
public int executeMetaCommand(String line) throws Exception {
+ checkMasterStatus();
String [] metaCommands = line.split(";");
for (String metaCommand : metaCommands) {
String arguments [] = metaCommand.split(" ");
@@ -452,7 +457,8 @@ public class TajoCli {
return 0;
}
- private void executeJsonQuery(String json) throws ServiceException {
+ private void executeJsonQuery(String json) throws ServiceException, IOException {
+ checkMasterStatus();
long startTime = System.currentTimeMillis();
ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json);
if (response == null) {
@@ -478,7 +484,8 @@ public class TajoCli {
}
}
- private int executeQuery(String statement) throws ServiceException {
+ private int executeQuery(String statement) throws ServiceException, IOException {
+ checkMasterStatus();
long startTime = System.currentTimeMillis();
ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
if (response == null) {
@@ -626,6 +633,17 @@ public class TajoCli {
}
}
+ private void checkMasterStatus() throws IOException, ServiceException {
+ String sessionId = client.getSessionId() != null ? client.getSessionId().getId() : null;
+ client = TajoHAClientUtil.getTajoClient(conf, client, context);
+ if(sessionId != null && (client.getSessionId() == null ||
+ !sessionId.equals(client.getSessionId().getId()))) {
+ commands.clear();
+ initHistory();
+ initCommands();
+ }
+ }
+
public static void main(String [] args) throws Exception {
TajoConf conf = new TajoConf();
TajoCli shell = new TajoCli(conf, args, System.in, System.out);
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
new file mode 100644
index 0000000..ad88b3f
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoHAAdminCommand.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.client.TajoHAAdmin;
+
+public class TajoHAAdminCommand extends TajoShellCommand {
+ private TajoHAAdmin haAdmin;
+
+ public TajoHAAdminCommand(TajoCli.TajoCliContext context) {
+ super(context);
+ haAdmin = new TajoHAAdmin(context.getConf(), context.getOutput(), context.getTajoClient());
+ }
+
+ @Override
+ public String getCommand() {
+ return "\\haadmin";
+ }
+
+ @Override
+ public void invoke(String[] command) throws Exception {
+ try {
+ String[] haAdminCommands = new String[command.length - 1];
+ System.arraycopy(command, 1, haAdminCommands, 0, haAdminCommands.length);
+
+ haAdmin.runCommand(haAdminCommands);
+ } catch (Exception e) {
+ context.getOutput().println("ERROR: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public String getUsage() {
+ return "<command> [options]";
+ }
+
+ @Override
+ public String getDescription() {
+ return "execute a tajo haAdminF command.";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 58b7184..95dfc68 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -27,6 +27,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -173,6 +174,7 @@ public class TajoAdmin {
private void processDesc(Writer writer) throws ParseException, IOException,
ServiceException, SQLException {
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
int id = 1;
@@ -212,6 +214,7 @@ public class TajoAdmin {
private void processCluster(Writer writer) throws ParseException, IOException,
ServiceException, SQLException {
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
List<WorkerResourceInfo> workerList = tajoClient.getClusterInfo();
int runningQueryMasterTasks = 0;
@@ -376,6 +379,7 @@ public class TajoAdmin {
private void processList(Writer writer) throws ParseException, IOException,
ServiceException, SQLException {
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
StringBuilder builder = new StringBuilder();
@@ -416,10 +420,25 @@ public class TajoAdmin {
private void processMasters(Writer writer) throws ParseException, IOException,
ServiceException, SQLException {
- String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
- writer.write(masterAddress.getHostName());
- writer.write("\n");
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
+ if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
+ List<String> list = HAServiceUtil.getMasters(tajoConf);
+ int i = 0;
+ for (String master : list) {
+ if (i > 0) {
+ writer.write(" ");
+ }
+ writer.write(master);
+ i++;
+ }
+ writer.write("\n");
+ } else {
+ String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
+ writer.write(masterAddress.getHostName());
+ writer.write("\n");
+ }
}
public static void main(String [] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 3a90a48..cc993f3 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -29,7 +29,10 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.annotation.ThreadSafe;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.cli.InvalidClientSessionException;
import org.apache.tajo.conf.TajoConf;
@@ -46,6 +49,7 @@ import org.apache.tajo.jdbc.TajoResultSet;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
@@ -130,15 +134,30 @@ public class TajoClient implements Closeable {
return sessionId;
}
+ private InetSocketAddress getTajoMasterAddr() {
+ if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ return tajoMasterAddr;
+ } else {
+ if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
+ return HAServiceUtil.getMasterClientAddress(conf);
+ } else {
+ return tajoMasterAddr;
+ }
+ }
+ }
+
+ public String getBaseDatabase() {
+ return baseDatabase;
+ }
+
@Override
public void close() {
// remove session
try {
- NettyClientBase client = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+ NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);
} catch (Exception e) {
- LOG.error(e);
}
if(connPool != null) {
@@ -203,7 +222,7 @@ public class TajoClient implements Closeable {
}
public String getCurrentDatabase() throws ServiceException {
- return new ServerCallable<String>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public String call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -215,7 +234,7 @@ public class TajoClient implements Closeable {
}
public Boolean selectDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -227,7 +246,7 @@ public class TajoClient implements Closeable {
}
public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -245,7 +264,7 @@ public class TajoClient implements Closeable {
}
public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -260,7 +279,7 @@ public class TajoClient implements Closeable {
}
public String getSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<String>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public String call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -272,7 +291,7 @@ public class TajoClient implements Closeable {
}
public Boolean existSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -284,7 +303,7 @@ public class TajoClient implements Closeable {
}
public Map<String, String> getAllSessionVariables() throws ServiceException {
- return new ServerCallable<Map<String, String>>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class,
+ return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
public Map<String, String> call(NettyClientBase client) throws ServiceException {
@@ -304,7 +323,7 @@ public class TajoClient implements Closeable {
* or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
*/
public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
- return new ServerCallable<SubmitQueryResponse>(connPool, tajoMasterAddr,
+ return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -320,7 +339,7 @@ public class TajoClient implements Closeable {
}
public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<SubmitQueryResponse>(connPool, tajoMasterAddr,
+ return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -412,7 +431,7 @@ public class TajoClient implements Closeable {
} else {
NettyClientBase tmClient = null;
try {
- tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+ tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
checkSessionAndGet(tmClient);
builder.setSessionId(sessionId);
@@ -578,7 +597,7 @@ public class TajoClient implements Closeable {
}
public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -602,7 +621,7 @@ public class TajoClient implements Closeable {
}
public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -633,7 +652,7 @@ public class TajoClient implements Closeable {
* @throws ServiceException
*/
public boolean createDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -650,7 +669,7 @@ public class TajoClient implements Closeable {
* @throws ServiceException
*/
public boolean existDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -667,7 +686,7 @@ public class TajoClient implements Closeable {
* @throws ServiceException
*/
public boolean dropDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -677,7 +696,7 @@ public class TajoClient implements Closeable {
}
public List<String> getAllDatabaseNames() throws ServiceException {
- return new ServerCallable<List<String>>(connPool, tajoMasterAddr, TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public List<String> call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -693,7 +712,7 @@ public class TajoClient implements Closeable {
* @return True if so.
*/
public boolean existTable(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -718,7 +737,7 @@ public class TajoClient implements Closeable {
public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
final TableMeta meta)
throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
+ return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
checkSessionAndGet(client);
@@ -759,7 +778,7 @@ public class TajoClient implements Closeable {
* @return True if the table is dropped successfully.
*/
public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -777,7 +796,7 @@ public class TajoClient implements Closeable {
}
public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
- return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
+ return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -792,7 +811,7 @@ public class TajoClient implements Closeable {
}
public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
- return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
+ return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -806,7 +825,7 @@ public class TajoClient implements Closeable {
}
public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
- return new ServerCallable<List<WorkerResourceInfo>>(connPool, tajoMasterAddr,
+ return new ServerCallable<List<WorkerResourceInfo>>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -829,7 +848,7 @@ public class TajoClient implements Closeable {
* in the current database of this session.
*/
public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
- return new ServerCallable<List<String>>(connPool, tajoMasterAddr,
+ return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public List<String> call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -854,8 +873,7 @@ public class TajoClient implements Closeable {
* @return Table description
*/
public TableDesc getTableDesc(final String tableName) throws ServiceException {
- return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
- TajoMasterClientProtocol.class, false, true) {
+ return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
checkSessionAndGet(client);
@@ -882,7 +900,7 @@ public class TajoClient implements Closeable {
NettyClientBase tmClient = null;
try {
/* send a kill to the TM */
- tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+ tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
checkSessionAndGet(tmClient);
@@ -915,7 +933,7 @@ public class TajoClient implements Closeable {
}
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, tajoMasterAddr,
+ return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {
public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
checkSessionAndGet(client);
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 32e6382..52e1894 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -159,8 +159,8 @@ public class TajoGetConf {
public static void main(String [] args) throws Exception {
TajoConf conf = new TajoConf();
- Writer writer = new PrintWriter(System.out); try {
- System.out.println("### 1000 ###");
+ Writer writer = new PrintWriter(System.out);
+ try {
TajoGetConf admin = new TajoGetConf(conf, writer);
admin.runCommand(args, false);
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
new file mode 100644
index 0000000..11cb4ed
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.cli.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.util.HAServiceUtil;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.List;
+
+public class TajoHAAdmin {
+ private static final Options options;
+
+ static {
+ options = new Options();
+ options.addOption("h", "host", true, "Tajo server host");
+ options.addOption("p", "port", true, "Tajo server port");
+ options.addOption("transitionToActive", null, true, "Transitions the master into Active state");
+ options.addOption("transitionToBackup", null, true, "Transitions the master into Backup state");
+ options.addOption("getState", null, true, "Returns the state of the master");
+ options.addOption("formatHA", null, false, "Format HA status on share storage");
+ }
+
+ private TajoConf tajoConf;
+ private TajoClient tajoClient;
+ private Writer writer;
+
+ public TajoHAAdmin(TajoConf tajoConf, Writer writer) {
+ this(tajoConf, writer, null);
+ }
+
+ public TajoHAAdmin(TajoConf tajoConf, Writer writer, TajoClient tajoClient) {
+ this.tajoConf = tajoConf;
+ this.writer = writer;
+ this.tajoClient = tajoClient;
+ }
+
+ private void printUsage() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "haadmin [options]", options );
+ }
+
+ public void runCommand(String[] args) throws Exception {
+ if(args.length == 1 &&
+ (args[0].equalsIgnoreCase("-transitionToActive")
+ || args[0].equalsIgnoreCase("-transitionToBackup")
+ || args[0].equalsIgnoreCase("-getState"))) {
+ writer.write("Not enough arguments: expected 1 but got 0\n");
+ writer.flush();
+ return;
+ }
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ String param = "";
+ int cmdType = 0;
+
+ String hostName = null;
+ Integer port = null;
+ if (cmd.hasOption("h")) {
+ hostName = cmd.getOptionValue("h");
+ }
+ if (cmd.hasOption("p")) {
+ port = Integer.parseInt(cmd.getOptionValue("p"));
+ }
+
+ if (cmd.hasOption("transitionToActive")) {
+ cmdType = 1;
+ param = cmd.getOptionValue("transitionToActive");
+ } else if (cmd.hasOption("transitionToBackup")) {
+ cmdType = 2;
+ param = cmd.getOptionValue("transitionToBackup");
+ } else if (cmd.hasOption("getState")) {
+ cmdType = 3;
+ param = cmd.getOptionValue("getState");
+ } else if (cmd.hasOption("formatHA")) {
+ cmdType = 4;
+ }
+
+ // if there is no "-h" option,
+ if(hostName == null) {
+ if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+ // it checks if the client service address is given in configuration and distributed mode.
+ // if so, it sets entryAddr.
+ hostName = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
+ }
+ }
+ if (port == null) {
+ if (tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
+ // it checks if the client service address is given in configuration and distributed mode.
+ // if so, it sets entryAddr.
+ port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
+ }
+ }
+
+ if (cmdType == 0) {
+ printUsage();
+ return;
+ }
+
+
+ if ((hostName == null) ^ (port == null)) {
+ System.err.println("ERROR: cannot find valid Tajo server address");
+ return;
+ } else if (hostName != null && port != null) {
+ tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
+ tajoClient = new TajoClient(tajoConf);
+ } else if (hostName == null && port == null) {
+ tajoClient = new TajoClient(tajoConf);
+ }
+
+ if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ writer.write("HA is not enabled for this tajo cluster.");
+ } else {
+ switch (cmdType) {
+ case 1:
+ writer.write("Not Yet Implemented\n");
+ break;
+ case 2:
+ writer.write("Not Yet Implemented\n");
+ break;
+ case 3:
+ getState(writer, param);
+ break;
+ case 4:
+ formatHA(writer);
+ break;
+ default:
+ printUsage();
+ break;
+ }
+ }
+
+ writer.flush();
+ }
+
+ private void getState(Writer writer, String param) throws ParseException, IOException,
+ ServiceException {
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
+ int retValue = HAServiceUtil.getState(param, tajoClient.getConf());
+
+ switch (retValue) {
+ case 1:
+ writer.write("The master is active.\n");
+ break;
+ case 0:
+ writer.write("The master is backup.\n");
+ break;
+ case -1:
+ writer.write("Finding failed. - master:" + param + "\n");
+ break;
+ default:
+ writer.write("Cannot find the master. - master:" + param + "\n");
+ break;
+ }
+ }
+
+ private void formatHA(Writer writer) throws ParseException, IOException,
+ ServiceException {
+ int retValue = HAServiceUtil.formatHA(tajoClient.getConf());
+
+ switch (retValue) {
+ case 1:
+ writer.write("Formatting finished successfully.\n");
+ break;
+ case 0:
+ writer.write("If you want to format the ha information, you must shutdown tajo masters "
+ + " before formatting.\n");
+ break;
+ default:
+ writer.write("Cannot format ha information.\n");
+ break;
+ }
+ }
+
+ public static void main(String [] args) throws Exception {
+ TajoConf conf = new TajoConf();
+
+ Writer writer = new PrintWriter(System.out);
+ try {
+ TajoHAAdmin admin = new TajoHAAdmin(conf, writer);
+ admin.runCommand(args);
+ } finally {
+ writer.close();
+ System.exit(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
new file mode 100644
index 0000000..f22d5ba
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.cli.TajoCli.TajoCliContext;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.HAServiceUtil;
+
+import java.io.IOException;
+
+public class TajoHAClientUtil {
+
+ /**
+ * In TajoMaster HA mode, if TajoCli can't connect existing active master,
+ * this should try to connect new active master.
+ *
+ * @param conf
+ * @param client
+ * @return
+ * @throws IOException
+ * @throws ServiceException
+ */
+ public static TajoClient getTajoClient(TajoConf conf, TajoClient client)
+ throws IOException, ServiceException {
+ return getTajoClient(conf, client, null);
+ }
+
+ public static TajoClient getTajoClient(TajoConf conf, TajoClient client,
+ TajoCliContext context) throws IOException, ServiceException {
+
+ if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ if (!HAServiceUtil.isMasterAlive(conf.getVar(
+ TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
+ TajoClient tajoClient = null;
+ String baseDatabase = client.getBaseDatabase();
+ conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
+ HAServiceUtil.getMasterClientName(conf));
+ client.close();
+ tajoClient = new TajoClient(conf, baseDatabase);
+
+ if (context != null && context.getCurrentDatabase() != null) {
+ tajoClient.selectDatabase(context.getCurrentDatabase());
+ }
+ return tajoClient;
+ } else {
+ return client;
+ }
+ } else {
+ return client;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
index 5f28f1c..ffe8c02 100644
--- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
@@ -35,5 +35,10 @@ public class TajoConstants {
public static final String EMPTY_STRING = "";
+ public static final String SYSTEM_HA_DIR_NAME = "ha";
+ public static final String SYSTEM_HA_ACTIVE_DIR_NAME = "active";
+ public static final String SYSTEM_HA_BACKUP_DIR_NAME = "backup";
+
+
private TajoConstants() {}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2504c23..f4229e7 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -158,6 +158,10 @@ public class TajoConf extends Configuration {
TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
+ // Tajo Master HA Configurations
+ TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false),
+ TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
+
// Resource tracker service
RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003"),
RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120 * 1000), // seconds
@@ -597,6 +601,10 @@ public class TajoConf extends Configuration {
return new Path(getSystemDir(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
}
+ public static Path getSystemHADir(TajoConf conf) {
+ return new Path(getSystemDir(conf), TajoConstants.SYSTEM_HA_DIR_NAME);
+ }
+
private static boolean hasScheme(String path) {
return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
new file mode 100644
index 0000000..4f03113
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAServiceUtil {
+
+ private final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
+ private final static int MASTER_CLIENT_RPC_ADDRESS = 2;
+ private final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
+ private final static int CATALOG_ADDRESS = 4;
+ private final static int MASTER_INFO_ADDRESS = 5;
+
+ public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
+ return getMasterAddress(conf, MASTER_UMBILICAL_RPC_ADDRESS);
+ }
+
+ public static String getMasterUmbilicalName(TajoConf conf) {
+ return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
+ }
+
+ public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
+ return getMasterAddress(conf, MASTER_CLIENT_RPC_ADDRESS);
+ }
+
+ public static String getMasterClientName(TajoConf conf) {
+ return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
+ }
+
+ public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
+ return getMasterAddress(conf, RESOURCE_TRACKER_RPC_ADDRESS);
+ }
+
+ public static String getResourceTrackerName(TajoConf conf) {
+ return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
+ }
+
+ public static InetSocketAddress getCatalogAddress(TajoConf conf) {
+ return getMasterAddress(conf, CATALOG_ADDRESS);
+ }
+
+ public static String getCatalogName(TajoConf conf) {
+ return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
+ }
+
+ public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
+ return getMasterAddress(conf, MASTER_INFO_ADDRESS);
+ }
+
+ public static String getMasterInfoName(TajoConf conf) {
+ return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
+ }
+
+ public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
+ InetSocketAddress masterAddress = null;
+
+ if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+ if (fs.exists(activePath)) {
+ FileStatus[] files = fs.listStatus(activePath);
+
+ if (files.length == 1) {
+ Path file = files[0].getPath();
+ String hostAddress = file.getName().replaceAll("_", ":");
+ FSDataInputStream stream = fs.open(file);
+ String data = stream.readUTF();
+ stream.close();
+
+ String[] addresses = data.split("_");
+
+ switch (type) {
+ case 1:
+ masterAddress = NetUtils.createSocketAddr(hostAddress);
+ break;
+ case 2:
+ masterAddress = NetUtils.createSocketAddr(addresses[0]);
+ break;
+ case 3:
+ masterAddress = NetUtils.createSocketAddr(addresses[1]);
+ break;
+ case 4:
+ masterAddress = NetUtils.createSocketAddr(addresses[2]);
+ break;
+ case 5:
+ masterAddress = NetUtils.createSocketAddr(addresses[3]);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if (masterAddress == null) {
+ switch (type) {
+ case 1:
+ masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+ .TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ break;
+ case 2:
+ masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+ .TAJO_MASTER_CLIENT_RPC_ADDRESS));
+ break;
+ case 3:
+ masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+ .RESOURCE_TRACKER_RPC_ADDRESS));
+ break;
+ case 4:
+ masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+ .CATALOG_ADDRESS));
+ break;
+ case 5:
+ masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
+ .TAJO_MASTER_INFO_ADDRESS));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return masterAddress;
+ }
+
+ public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+ return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+ }
+
+ public static boolean isMasterAlive(String masterName, TajoConf conf) {
+ boolean isAlive = true;
+
+ try {
+ // how to create sockets
+ SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+ int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+ InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+ // connected socket
+ Socket socket = socketFactory.createSocket();
+ org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+ } catch (Exception e) {
+ isAlive = false;
+ }
+ return isAlive;
+ }
+
+ public static int getState(String masterName, TajoConf conf) {
+ String targetMaster = masterName.replaceAll(":", "_");
+ int retValue = -1;
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 0;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 1;
+ }
+ }
+ retValue = -2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+ public static int formatHA(TajoConf conf) {
+ int retValue = -1;
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ int aliveMasterCount = 0;
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // If there is any alive master, users can't format storage.
+ if (aliveMasterCount > 0) {
+ return 0;
+ }
+
+ // delete ha path.
+ fs.delete(TajoConf.getSystemHADir(conf), true);
+ retValue = 1;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+
+ public static List<String> getMasters(TajoConf conf) {
+ List<String> list = new ArrayList<String>();
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return list;
+ }
+
+ private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+ Path rootPath = TajoConf.getTajoRootDir(conf);
+ return rootPath.getFileSystem(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7f1eac6..cb7861c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -34,6 +35,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.HAServiceUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -160,9 +162,29 @@ public class TajoContainerProxy extends ContainerProxy {
RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
NettyClientBase tmClient = null;
try {
+ // In TajoMaster HA mode, if backup master be active status,
+ // worker may fail to connect existing active master. Thus,
+ // if worker can't connect the master, worker should try to connect another master and
+ // update master address in worker context.
+ TajoConf conf = context.getConf();
+ if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ try {
+ tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ } catch (Exception e) {
+ context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
+ HAServiceUtil.getResourceTrackerAddress(conf));
+ context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
+ HAServiceUtil.getMasterUmbilicalAddress(conf));
+ tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ }
+ } else {
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
TajoMasterProtocol.class, true);
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ }
+
+ TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 0962ca5..4e51460 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -45,6 +45,8 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.annotation.Description;
import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.master.ha.HAService;
+import org.apache.tajo.master.ha.HAServiceHDFSImpl;
import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.querymaster.QueryJobManager;
@@ -130,6 +132,8 @@ public class TajoMaster extends CompositeService {
private TajoSystemMetrics systemMetrics;
+ private HAService haService;
+
public TajoMaster() throws Exception {
super(TajoMaster.class.getName());
}
@@ -219,6 +223,20 @@ public class TajoMaster extends CompositeService {
}
}
+
+ private void initHAManger() throws Exception {
+ // If tajo provides haService based on ZooKeeper, following codes need to update.
+ if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ haService = new HAServiceHDFSImpl(context);
+ haService.register();
+ }
+ }
+
+ public boolean isActiveMaster() {
+ return (haService != null ? haService.isActiveStatus() : true);
+ }
+
+
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
@@ -362,6 +380,12 @@ public class TajoMaster extends CompositeService {
}
initSystemMetrics();
+
+ try {
+ initHAManger();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
}
private void writeSystemConf() throws IOException {
@@ -402,6 +426,14 @@ public class TajoMaster extends CompositeService {
@Override
public void stop() {
+ if (haService != null) {
+ try {
+ haService.delete();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
if (webServer != null) {
try {
webServer.stop();
@@ -492,6 +524,10 @@ public class TajoMaster extends CompositeService {
public TajoSystemMetrics getSystemMetrics() {
return systemMetrics;
}
+
+ public HAService getHAService() {
+ return haService;
+ }
}
String getThreadTaskName(long id, String name) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
new file mode 100644
index 0000000..3d6669c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The HAService is responsible for setting active TajoMaster on startup or when the
+ * current active is changing (eg due to failure), monitoring the health of TajoMaster.
+ *
+ */
+public interface HAService {
+
+ /**
+ * Add master name to shared storage.
+ */
+ public void register() throws IOException;
+
+
+ /**
+ * Delete master name to shared storage.
+ *
+ */
+ public void delete() throws IOException;
+
+ /**
+ *
+ * @return True if current master is an active master.
+ */
+ public boolean isActiveStatus();
+
+ /**
+ *
+ * @return return all master list
+ * @throws IOException
+ */
+ public List<TajoMasterInfo> getMasters() throws IOException;
+
+}