You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2015/05/14 07:21:07 UTC
tajo git commit: TAJO-1586: TajoMaster HA startup failure on Yarn.
(jaehwa)
Repository: tajo
Updated Branches:
refs/heads/master 6977471e4 -> 31c4630d5
TAJO-1586: TajoMaster HA startup failure on Yarn. (jaehwa)
Closes #566
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/31c4630d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/31c4630d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/31c4630d
Branch: refs/heads/master
Commit: 31c4630d5d3ce0dee1df35491c557eefad15deeb
Parents: 6977471
Author: JaeHwa Jung <bl...@apache.org>
Authored: Thu May 14 14:17:54 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Thu May 14 14:19:55 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/cli/tools/TajoAdmin.java | 6 +-
.../org/apache/tajo/cli/tools/TajoHAAdmin.java | 9 +-
.../apache/tajo/client/DummyServiceTracker.java | 19 +-
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
.../java/org/apache/tajo/ha/HAConstants.java | 1 +
.../java/org/apache/tajo/ha/HAServiceUtil.java | 253 ---------------
.../apache/tajo/service/BaseServiceTracker.java | 31 +-
.../apache/tajo/service/HAServiceTracker.java | 20 +-
.../org/apache/tajo/service/ServiceTracker.java | 28 +-
.../org/apache/tajo/ha/HdfsServiceTracker.java | 322 +++++++++++--------
.../java/org/apache/tajo/master/TajoMaster.java | 21 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 2 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 2 +
.../resources/webapps/admin/catalogview.jsp | 11 +-
.../main/resources/webapps/admin/cluster.jsp | 10 +-
.../src/main/resources/webapps/admin/index.jsp | 10 +-
.../src/main/resources/webapps/admin/query.jsp | 7 +-
.../resources/webapps/admin/query_executor.jsp | 9 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 28 +-
20 files changed, 346 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4912114..c9b2522 100644
--- a/CHANGES
+++ b/CHANGES
@@ -123,6 +123,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1586: TajoMaster HA startup failure on Yarn. (jaehwa)
+
TAJO-1598: TableMeta should change equals mechanism.
(Contributed by DaeMyung Kang, Committed by jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 4f56649..739cd54 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -28,9 +28,9 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
@@ -74,6 +74,7 @@ public class TajoAdmin {
private TajoConf tajoConf;
private TajoClient tajoClient;
private Writer writer;
+ private ServiceTracker serviceTracker;
public TajoAdmin(TajoConf tajoConf, Writer writer) {
this(tajoConf, writer, null);
@@ -83,6 +84,7 @@ public class TajoAdmin {
this.tajoConf = tajoConf;
this.writer = writer;
this.tajoClient = tajoClient;
+ serviceTracker = ServiceTrackerFactory.get(this.tajoConf);
}
private void printUsage() {
@@ -419,7 +421,7 @@ public class TajoAdmin {
if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- List<String> list = HAServiceUtil.getMasters(tajoConf);
+ List<String> list = serviceTracker.getMasters(tajoConf);
int i = 0;
for (String master : list) {
if (i > 0) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
index e25d7d4..834b6b1 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java
@@ -21,9 +21,8 @@ package org.apache.tajo.cli.tools;
import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import java.io.IOException;
@@ -45,6 +44,7 @@ public class TajoHAAdmin {
private TajoConf tajoConf;
private Writer writer;
+ private ServiceTracker serviceTracker;
public TajoHAAdmin(TajoConf tajoConf, Writer writer) {
this(tajoConf, writer, null);
@@ -53,6 +53,7 @@ public class TajoHAAdmin {
public TajoHAAdmin(TajoConf tajoConf, Writer writer, TajoClient tajoClient) {
this.tajoConf = tajoConf;
this.writer = writer;
+ serviceTracker = ServiceTrackerFactory.get(this.tajoConf);
}
private void printUsage() {
@@ -155,7 +156,7 @@ public class TajoHAAdmin {
private void getState(Writer writer, String param) throws ParseException, IOException,
ServiceException {
- int retValue = HAServiceUtil.getState(param, tajoConf);
+ int retValue = serviceTracker.getState(param, tajoConf);
switch (retValue) {
case 1:
@@ -175,7 +176,7 @@ public class TajoHAAdmin {
private void formatHA(Writer writer) throws ParseException, IOException,
ServiceException {
- int retValue = HAServiceUtil.formatHA(tajoConf);
+ int retValue = serviceTracker.formatHA(tajoConf);
switch (retValue) {
case 1:
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
index 762c2e7..cf826ea 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
@@ -18,6 +18,7 @@
package org.apache.tajo.client;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerException;
@@ -25,6 +26,7 @@ import org.apache.tajo.service.TajoMasterInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
public class DummyServiceTracker implements ServiceTracker {
@@ -65,6 +67,21 @@ public class DummyServiceTracker implements ServiceTracker {
}
@Override
+ public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
+ return 0;
+ }
+
+ @Override
+ public int formatHA(TajoConf conf) throws ServiceTrackerException {
+ return 0;
+ }
+
+ @Override
+ public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
+ return new ArrayList<String>();
+ }
+
+ @Override
public void register() throws IOException {
}
@@ -73,7 +90,7 @@ public class DummyServiceTracker implements ServiceTracker {
}
@Override
- public boolean isActiveStatus() {
+ public boolean isActiveMaster() {
return true;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 46e7618..59b1f43 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -142,6 +142,8 @@ public class TajoConf extends Configuration {
// High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
+ TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM("tajo.master.ha.client.read.retry.max-num", 120), // 120 retry
+ TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME("tajo.master.ha.client.read.pause-time", 500), // 500 ms
// Service discovery
DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
index c5f4b8a..7af19c6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java
@@ -24,4 +24,5 @@ public class HAConstants {
public final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
public final static int CATALOG_ADDRESS = 4;
public final static int MASTER_INFO_ADDRESS = 5;
+ public final static String ACTIVE_LOCK_FILE = "active.lock";
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
deleted file mode 100644
index 7001228..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.NetUtils;
-
-
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-
-public class HAServiceUtil {
-
- public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
- }
-
- public static String getMasterClientName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
- }
-
- public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
- InetSocketAddress masterAddress = null;
-
- if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- FileSystem fs = getFileSystem(conf);
- Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-
- if (fs.exists(activePath)) {
- FileStatus[] files = fs.listStatus(activePath);
-
- if (files.length == 1) {
- Path file = files[0].getPath();
- String hostAddress = file.getName().replaceAll("_", ":");
- FSDataInputStream stream = fs.open(file);
- String data = stream.readUTF();
- stream.close();
-
- String[] addresses = data.split("_");
-
- switch (type) {
- case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(hostAddress);
- break;
- case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(addresses[0]);
- break;
- case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(addresses[1]);
- break;
- case HAConstants.CATALOG_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(addresses[2]);
- break;
- case HAConstants.MASTER_INFO_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(addresses[3]);
- break;
- default:
- break;
- }
- }
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- if (masterAddress == null) {
- switch (type) {
- case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
- break;
- case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS));
- break;
- case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS));
- break;
- case HAConstants.CATALOG_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .CATALOG_ADDRESS));
- break;
- case HAConstants.MASTER_INFO_ADDRESS:
- masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .TAJO_MASTER_INFO_ADDRESS));
- break;
- default:
- break;
- }
- }
-
- return masterAddress;
- }
-
- public static boolean isMasterAlive(String masterName, TajoConf conf) {
- boolean isAlive = true;
-
- try {
- // how to create sockets
- SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
-
- int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
- CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
-
- InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
-
- // connected socket
- Socket socket = socketFactory.createSocket();
- org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
- } catch (Exception e) {
- isAlive = false;
- }
- return isAlive;
- }
-
- public static int getState(String masterName, TajoConf conf) {
- String targetMaster = masterName.replaceAll(":", "_");
- int retValue = -1;
-
- try {
- FileSystem fs = getFileSystem(conf);
- Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-
- Path temPath = null;
-
- // Check backup masters
- FileStatus[] files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- temPath = status.getPath();
- if (temPath.getName().equals(targetMaster)) {
- return 0;
- }
- }
-
- // Check active master
- files = fs.listStatus(activePath);
- if (files.length == 1) {
- temPath = files[0].getPath();
- if (temPath.getName().equals(targetMaster)) {
- return 1;
- }
- }
- retValue = -2;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return retValue;
- }
-
- public static int formatHA(TajoConf conf) {
- int retValue = -1;
- try {
- FileSystem fs = getFileSystem(conf);
- Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
- Path temPath = null;
-
- int aliveMasterCount = 0;
- // Check backup masters
- FileStatus[] files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- temPath = status.getPath();
- if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
- aliveMasterCount++;
- }
- }
-
- // Check active master
- files = fs.listStatus(activePath);
- if (files.length == 1) {
- temPath = files[0].getPath();
- if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
- aliveMasterCount++;
- }
- }
-
- // If there is any alive master, users can't format storage.
- if (aliveMasterCount > 0) {
- return 0;
- }
-
- // delete ha path.
- fs.delete(TajoConf.getSystemHADir(conf), true);
- retValue = 1;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return retValue;
- }
-
-
- public static List<String> getMasters(TajoConf conf) {
- List<String> list = new ArrayList<String>();
-
- try {
- FileSystem fs = getFileSystem(conf);
- Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
- Path temPath = null;
-
- // Check backup masters
- FileStatus[] files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- temPath = status.getPath();
- list.add(temPath.getName().replaceAll("_", ":"));
- }
-
- // Check active master
- files = fs.listStatus(activePath);
- if (files.length == 1) {
- temPath = files[0].getPath();
- list.add(temPath.getName().replaceAll("_", ":"));
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- return list;
- }
-
- private static FileSystem getFileSystem(TajoConf conf) throws IOException {
- Path rootPath = TajoConf.getTajoRootDir(conf);
- return rootPath.getFileSystem(conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
index bf7fd2c..e598f2a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -77,6 +77,29 @@ public class BaseServiceTracker implements ServiceTracker {
}
@Override
+ public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
+ String masterAddress = getMasterAddress();
+
+ if (masterAddress.equals(masterName)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int formatHA(TajoConf conf) throws ServiceTrackerException {
+ throw new ServiceTrackerException("Cannot format HA directories on non-HA mode");
+ }
+
+ @Override
+ public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
+ List<String> list = TUtil.newList();
+ list.add(getMasterAddress());
+ return list;
+ }
+
+ @Override
public void register() throws IOException {
}
@@ -85,7 +108,7 @@ public class BaseServiceTracker implements ServiceTracker {
}
@Override
- public boolean isActiveStatus() {
+ public boolean isActiveMaster() {
return true;
}
@@ -94,4 +117,10 @@ public class BaseServiceTracker implements ServiceTracker {
return tajoMasterInfos;
}
+ private String getMasterAddress() {
+ String masterAddress = tajoMasterInfo.getTajoMasterAddress().getAddress().getHostAddress() + ":" + tajoMasterInfo
+ .getTajoMasterAddress().getPort();
+
+ return masterAddress;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
index c808537..081b153 100644
--- a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
+++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
@@ -18,13 +18,18 @@
package org.apache.tajo.service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.FileUtil;
import javax.net.SocketFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
public abstract class HAServiceTracker implements ServiceTracker {
+ private static final Log LOG = LogFactory.getLog(HAServiceTracker.class);
static SocketFactory socketFactory = SocketFactory.getDefault();
@@ -32,16 +37,29 @@ public abstract class HAServiceTracker implements ServiceTracker {
return true;
}
+ public static boolean checkConnection(String address) {
+ return checkConnection(address, ":");
+ }
+
+ public static boolean checkConnection(String address, String delimiter) {
+ String[] hostAddress = address.split(delimiter);
+ InetSocketAddress socketAddress = new InetSocketAddress(hostAddress[0], Integer.parseInt(hostAddress[1]));
+ return checkConnection(socketAddress);
+ }
+
public static boolean checkConnection(InetSocketAddress address) {
boolean isAlive = true;
+ Socket socket = null;
try {
int connectionTimeout = 10;
- Socket socket = socketFactory.createSocket();
+ socket = socketFactory.createSocket();
NetUtils.connect(socket, address, connectionTimeout);
} catch (Exception e) {
isAlive = false;
+ } finally {
+ FileUtil.cleanup(LOG, socket);
}
return isAlive;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
index 73ff112..5888ff3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
@@ -18,46 +18,54 @@
package org.apache.tajo.service;
+import org.apache.tajo.conf.TajoConf;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
public interface ServiceTracker {
- public abstract boolean isHighAvailable();
+ boolean isHighAvailable();
+
+ InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+
+ InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+
+ InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
- public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+ InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
- public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+ InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
- public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
+ int getState(String masterName, TajoConf conf) throws ServiceTrackerException;
- public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
+ int formatHA(TajoConf conf) throws ServiceTrackerException;
- public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
+ List<String> getMasters(TajoConf conf) throws ServiceTrackerException;
/**
* Add master name to shared storage.
*/
- public void register() throws IOException;
+ void register() throws IOException;
/**
* Delete master name to shared storage.
*
*/
- public void delete() throws IOException;
+ void delete() throws IOException;
/**
*
* @return True if current master is an active master.
*/
- public boolean isActiveStatus();
+ boolean isActiveMaster();
/**
*
* @return return all master list
* @throws IOException
*/
- public List<TajoMasterInfo> getMasters() throws IOException;
+ List<TajoMasterInfo> getMasters() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
index 4a782ec..5f1aff8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -31,7 +32,8 @@ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.service.HAServiceTracker;
import org.apache.tajo.service.ServiceTrackerException;
import org.apache.tajo.service.TajoMasterInfo;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.*;
+import org.apache.tajo.util.FileUtil;
import javax.net.SocketFactory;
import java.io.IOException;
@@ -58,7 +60,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
private Path activePath;
private Path backupPath;
- private boolean isActiveStatus = false;
+ private boolean isActiveMaster = false;
//thread which runs periodically to see the last time since a heartbeat is received.
private Thread checkerThread;
@@ -74,8 +76,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
- monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+ this.monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
}
private void initSystemDirectory() throws IOException {
@@ -113,87 +114,144 @@ public class HdfsServiceTracker extends HAServiceTracker {
}
}
+ /**
+ * It will creates the following form string. It includes
+ *
+ * <pre>
+ * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
+ * </pre>
+ *
+ * @throws IOException
+ */
@Override
public void register() throws IOException {
- FileStatus[] files = fs.listStatus(activePath);
+ // Check lock file
+ boolean lockResult = createLockFile();
+
+ String fileName = masterName.replaceAll(":", "_");
+ Path activeFile = new Path(activePath, fileName);
+ Path backupFile = new Path(backupPath, fileName);
+
+ // Set TajoMasterInfo object which has several rpc server addresses.
+ StringBuilder sb = new StringBuilder();
+ InetSocketAddress address = getHostAddress(HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
// Phase 1: If there is not another active master, this try to become active master.
- if (files.length == 0) {
- createMasterFile(true);
+ if (lockResult) {
+ fs.delete(backupFile, false);
+ createMasterFile(activeFile, sb);
currentActiveMaster = masterName;
+ writeSystemConf();
LOG.info(String.format("This is added to active master (%s)", masterName));
} else {
// Phase 2: If there is active master information, we need to check its status.
- Path activePath = files[0].getPath();
- currentActiveMaster = activePath.getName().replaceAll("_", ":");
+ FileStatus[] files = fs.listStatus(activePath);
+ Path existingActiveFile = null;
+ if (files.length > 2) {
+ throw new ServiceTrackerException("Three or more than active master entries.");
+ }
+ for(FileStatus eachFile : files) {
+ if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) {
+ existingActiveFile = eachFile.getPath();
+ }
+ }
+ currentActiveMaster = existingActiveFile.getName().replaceAll("_", ":");
// Phase 3: If current active master is dead, this master should be active master.
- if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
- fs.delete(activePath, true);
- createMasterFile(true);
+ if (!checkConnection(currentActiveMaster)) {
+ fs.delete(existingActiveFile, false);
+ fs.delete(backupFile, false);
+ createMasterFile(activeFile, sb);
currentActiveMaster = masterName;
LOG.info(String.format("This is added to active master (%s)", masterName));
} else {
// Phase 4: If current active master is alive, this master need to be backup master.
- createMasterFile(false);
- LOG.info(String.format("This is added to backup masters (%s)", masterName));
+ if (masterName.equals(currentActiveMaster)) {
+ LOG.info(String.format("This has already been added to active master (%s)", masterName));
+ } else {
+ if (fs.exists(backupFile)) {
+ LOG.info(String.format("This has already been added to backup masters (%s)", masterName));
+ } else {
+ createMasterFile(backupFile, sb);
+ LOG.info(String.format("This is added to backup master (%s)", masterName));
+ }
+ }
}
}
+ startPingChecker();
}
/**
- * It will creates the following form string. It includes
+ * Storing the system configs
*
- * <pre>
- * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
- * </pre>
- *
- * @param isActive A boolean flag to indicate if it is for master or not.
* @throws IOException
*/
- private void createMasterFile(boolean isActive) throws IOException {
- String fileName = masterName.replaceAll(":", "_");
- Path path = null;
+ private void writeSystemConf() throws IOException {
+ Path systemConfPath = TajoConf.getSystemConfPath(conf);
- if (isActive) {
- path = new Path(activePath, fileName);
- } else {
- path = new Path(backupPath, fileName);
+ FSDataOutputStream out = FileSystem.create(fs, systemConfPath,
+ new FsPermission(TajoMaster.SYSTEM_CONF_FILE_PERMISSION));
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
}
+ fs.setReplication(systemConfPath, (short) conf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
+ }
- StringBuilder sb = new StringBuilder();
- InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.CATALOG_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+ private boolean createLockFile() throws IOException {
+ boolean result = false;
+ FSDataOutputStream lockOutput = null;
- address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+ Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE);
+ try {
+ lockOutput = fs.create(lockFile, false);
+ lockOutput.hsync();
+ lockOutput.close();
+ fs.deleteOnExit(lockFile);
+ result = true;
+ } catch (FileAlreadyExistsException e) {
+ LOG.info(String.format("Lock file already exists at (%s)", lockFile.toString()));
+ result = false;
+ } catch (Exception e) {
+ throw new IOException("Lock file creation is failed - " + e.getMessage());
+ } finally {
+ FileUtil.cleanup(LOG, lockOutput);
+ }
- FSDataOutputStream out = fs.create(path);
+ return result;
+ }
+ private void createMasterFile(Path path, StringBuilder sb) throws IOException {
+ FSDataOutputStream out = null;
try {
+ out = fs.create(path, false);
+
out.writeUTF(sb.toString());
out.hsync();
out.close();
- } catch (FileAlreadyExistsException e) {
- createMasterFile(false);
- }
- if (isActive) {
- isActiveStatus = true;
- } else {
- isActiveStatus = false;
+ fs.deleteOnExit(path);
+ } catch (Exception e) {
+ throw new IOException("File creation is failed - " + e.getMessage());
+ } finally {
+ FileUtil.cleanup(LOG, out);
}
-
- startPingChecker();
}
-
private InetSocketAddress getHostAddress(int type) {
InetSocketAddress address = null;
@@ -226,65 +284,61 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public void delete() throws IOException {
+ if (ShutdownHookManager.get().isShutdownInProgress()) return;
+
String fileName = masterName.replaceAll(":", "_");
- Path activeFile = new Path(activePath, fileName);
- if (fs.exists(activeFile)) {
- fs.delete(activeFile, true);
- }
+ fs.delete(new Path(activePath, fileName), false);
+ fs.delete(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE), false);
+ fs.delete(new Path(backupPath, fileName), false);
- Path backupFile = new Path(backupPath, fileName);
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true);
- }
- if (isActiveStatus) {
- isActiveStatus = false;
- }
stopped = true;
}
@Override
- public boolean isActiveStatus() {
- return isActiveStatus;
+ public boolean isActiveMaster() {
+ if (currentActiveMaster.equals(masterName)) {
+ return true;
+ } else {
+ return false;
+ }
}
@Override
public List<TajoMasterInfo> getMasters() throws IOException {
List<TajoMasterInfo> list = TUtil.newList();
- Path path = null;
FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 1) {
- path = files[0].getPath();
- list.add(createTajoMasterInfo(path, true));
+ for(FileStatus status : files) {
+ if (!status.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) {
+ list.add(getTajoMasterInfo(status.getPath(), true));
+ }
}
files = fs.listStatus(backupPath);
for (FileStatus status : files) {
- path = status.getPath();
- list.add(createTajoMasterInfo(path, false));
+ list.add(getTajoMasterInfo(status.getPath(), false));
}
return list;
}
- private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+ private TajoMasterInfo getTajoMasterInfo(Path path, boolean isActive) throws IOException {
String masterAddress = path.getName().replaceAll("_", ":");
- boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+ boolean isAlive = checkConnection(masterAddress);
FSDataInputStream stream = fs.open(path);
String data = stream.readUTF();
-
stream.close();
String[] addresses = data.split("_");
TajoMasterInfo info = new TajoMasterInfo();
- info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
- info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
- info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
- info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
- info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+ info.setTajoMasterAddress(NetUtils.createSocketAddr(addresses[0]));
+ info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[1]));
+ info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[2]));
+ info.setCatalogAddress(NetUtils.createSocketAddr(addresses[3]));
+ info.setWebServerAddress(NetUtils.createSocketAddr(addresses[4]));
info.setAvailable(isAlive);
info.setActive(isActive);
@@ -299,21 +353,18 @@ public class HdfsServiceTracker extends HAServiceTracker {
synchronized (HdfsServiceTracker.this) {
try {
if (!currentActiveMaster.equals(masterName)) {
- boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
if (LOG.isDebugEnabled()) {
- LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
- + ", isAlive:" + isAlive);
+ LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName);
}
// If active master is dead, this master should be active master instead of
// previous active master.
- if (!isAlive) {
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 0 || (files.length == 1
- && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
- delete();
- register();
- }
+ if (!checkConnection(currentActiveMaster)) {
+ Path activeFile = new Path(activePath, currentActiveMaster.replaceAll(":", "_"));
+ fs.delete(activeFile, false);
+ Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE);
+ fs.delete(lockFile, false);
+ register();
}
}
} catch (Exception e) {
@@ -345,7 +396,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public InetSocketAddress getUmbilicalAddress() {
if (!checkConnection(umbilicalRpcAddr)) {
- umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS));
+ umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_UMBILICAL_RPC_ADDRESS));
}
return umbilicalRpcAddr;
@@ -354,7 +405,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public InetSocketAddress getClientServiceAddress() {
if (!checkConnection(clientRpcAddr)) {
- clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS));
+ clientRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_CLIENT_RPC_ADDRESS));
}
return clientRpcAddr;
@@ -363,7 +414,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public InetSocketAddress getResourceTrackerAddress() {
if (!checkConnection(resourceTrackerRpcAddr)) {
- resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS));
+ resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(RESOURCE_TRACKER_RPC_ADDRESS));
}
return resourceTrackerRpcAddr;
@@ -372,7 +423,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public InetSocketAddress getCatalogAddress() {
if (!checkConnection(catalogAddr)) {
- catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS));
+ catalogAddr = NetUtils.createSocketAddr(getAddressElements().get(CATALOG_ADDRESS));
}
return catalogAddr;
@@ -381,7 +432,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
@Override
public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
if (!checkConnection(masterHttpInfoAddr)) {
- masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO));
+ masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_HTTP_INFO));
}
return masterHttpInfoAddr;
@@ -390,11 +441,10 @@ public class HdfsServiceTracker extends HAServiceTracker {
/**
* Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. *
*
- * @param conf
* @return all service addresses
* @throws ServiceTrackerException
*/
- private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException {
+ private synchronized List<String> getAddressElements() throws ServiceTrackerException {
try {
FileSystem fs = getFileSystem(conf);
@@ -408,15 +458,34 @@ public class HdfsServiceTracker extends HAServiceTracker {
}
FileStatus[] files = fs.listStatus(activeMasterBaseDir);
+ /* wait for active master from HDFS */
+ int pause = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME);
+ int maxRetry = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM);
+ int retry = 0;
+
+ while (files.length < 2 && retry < maxRetry) {
+ try {
+ this.wait(pause);
+ } catch (InterruptedException e) {
+ throw new ServiceTrackerException(e);
+ }
+ files = fs.listStatus(activeMasterBaseDir);
+ }
if (files.length < 1) {
+ LOG.error("Exceeded the maximum retry (" + maxRetry + ") to read TajoMaster address from HDFS");
throw new ServiceTrackerException("No active master entry");
- } else if (files.length > 1) {
- throw new ServiceTrackerException("Two or more than active master entries.");
+ } else if (files.length > 2) {
+ throw new ServiceTrackerException("Three or more than active master entries.");
}
- // We can ensure that there is only one file due to the above assertion.
- Path activeMasterEntry = files[0].getPath();
+ Path activeMasterEntry = null;
+
+ for (FileStatus eachFile : files) {
+ if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) {
+ activeMasterEntry = eachFile.getPath();
+ }
+ }
if (!fs.isFile(activeMasterEntry)) {
throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
@@ -424,12 +493,9 @@ public class HdfsServiceTracker extends HAServiceTracker {
List<String> addressElements = TUtil.newList();
- addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements
-
FSDataInputStream stream = fs.open(activeMasterEntry);
String data = stream.readUTF();
stream.close();
-
addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
// ensure the number of entries
@@ -442,33 +508,8 @@ public class HdfsServiceTracker extends HAServiceTracker {
}
}
-
- public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
- return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf);
- }
-
- public static boolean isMasterAlive(String masterName, TajoConf conf) {
- boolean isAlive = true;
-
- try {
- // how to create sockets
- SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
-
- int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
- CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
-
- InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
-
- // connected socket
- Socket socket = socketFactory.createSocket();
- org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
- } catch (Exception e) {
- isAlive = false;
- }
- return isAlive;
- }
-
- public static int getState(String masterName, TajoConf conf) {
+ @Override
+ public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
String targetMaster = masterName.replaceAll(":", "_");
int retValue = -1;
@@ -498,12 +539,13 @@ public class HdfsServiceTracker extends HAServiceTracker {
}
retValue = -2;
} catch (Exception e) {
- e.printStackTrace();
+ throw new ServiceTrackerException("Cannot get HA state - ERROR:" + e.getMessage());
}
return retValue;
}
- public static int formatHA(TajoConf conf) {
+ @Override
+ public int formatHA(TajoConf conf) throws ServiceTrackerException{
int retValue = -1;
try {
FileSystem fs = getFileSystem(conf);
@@ -512,20 +554,20 @@ public class HdfsServiceTracker extends HAServiceTracker {
Path temPath = null;
int aliveMasterCount = 0;
+
// Check backup masters
FileStatus[] files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- temPath = status.getPath();
- if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ for (FileStatus eachFile : files) {
+ if (checkConnection(eachFile.getPath().getName(), "_")) {
aliveMasterCount++;
}
}
// Check active master
files = fs.listStatus(activePath);
- if (files.length == 1) {
- temPath = files[0].getPath();
- if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ for (FileStatus eachFile : files) {
+ if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE) &&
+ checkConnection(eachFile.getPath().getName(), "_")) {
aliveMasterCount++;
}
}
@@ -539,13 +581,13 @@ public class HdfsServiceTracker extends HAServiceTracker {
fs.delete(TajoConf.getSystemHADir(conf), true);
retValue = 1;
} catch (Exception e) {
- e.printStackTrace();
+ throw new ServiceTrackerException("Cannot format HA directories - ERROR:" + e.getMessage());
}
return retValue;
}
-
- public static List<String> getMasters(TajoConf conf) {
+ @Override
+ public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
List<String> list = new ArrayList<String>();
try {
@@ -569,7 +611,7 @@ public class HdfsServiceTracker extends HAServiceTracker {
}
} catch (Exception e) {
- e.printStackTrace();
+ throw new ServiceTrackerException("Cannot get master lists - ERROR:" + e.getMessage());
}
return list;
}
@@ -578,4 +620,4 @@ public class HdfsServiceTracker extends HAServiceTracker {
Path rootPath = TajoConf.getTajoRootDir(conf);
return rootPath.getFileSystem(conf);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index d6ae49c..fb2a160 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -245,11 +245,6 @@ public class TajoMaster extends CompositeService {
}
}
- public boolean isActiveMaster() {
- return (haService != null ? haService.isActiveStatus() : true);
- }
-
-
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
@@ -360,14 +355,18 @@ public class TajoMaster extends CompositeService {
defaultFS.delete(systemConfPath, false);
}
- FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
+ // In TajoMaster HA, some master might see LeaseExpiredException because of lease mismatch. Thus,
+ // we need to create below xml file at HdfsServiceTracker::writeSystemConf.
+ if (!systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
new FsPermission(SYSTEM_CONF_FILE_PERMISSION));
- try {
- systemConf.writeXml(out);
- } finally {
- out.close();
+ try {
+ systemConf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
- defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
private void checkBaseTBSpaceAndDatabase() throws IOException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index aee2ced..578b15a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -197,7 +197,7 @@ public class JSPUtil {
ServiceTracker haService = context.getHAService();
String activeLabel = "";
if (haService != null) {
- if (haService.isActiveStatus()) {
+ if (haService.isActiveMaster()) {
activeLabel = "<font color='#1e90ff'>(active)</font>";
} else {
activeLabel = "<font color='#1e90ff'>(backup)</font>";
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index b666f80..0cecd73 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -40,6 +40,7 @@ import org.apache.tajo.function.FunctionSignature;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.service.TajoMasterInfo;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
@@ -322,6 +323,7 @@ public class TajoWorker extends CompositeService {
startJvmPauseMonitor();
tajoMasterInfo = new TajoMasterInfo();
+
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress());
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index e014379..43ec5ca 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -30,8 +30,15 @@
<%@ page import="java.util.Collection" %>
<%@ page import="java.util.List" %>
<%@ page import="java.util.Map" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ String[] masterName = master.getMasterName().split(":");
+ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1]));
+ String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort();
+
CatalogService catalog = master.getCatalog();
String catalogType = request.getParameter("type");
@@ -62,7 +69,7 @@
ServiceTracker haService = master.getContext().getHAService();
String activeLabel = "";
if (haService != null) {
- if (haService.isActiveStatus()) {
+ if (haService.isActiveMaster()) {
activeLabel = "<font color='#1e90ff'>(active)</font>";
} else {
activeLabel = "<font color='#1e90ff'>(backup)</font>";
@@ -80,7 +87,7 @@
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2>
+ <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2>
<hr/>
<h3>Catalog</h3>
<div>
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 816a144..97ca698 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -31,9 +31,15 @@
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.*" %>
<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ String[] masterName = master.getMasterName().split(":");
+ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1]));
+ String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort();
+
Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
Collections.sort(wokerKeys);
@@ -72,7 +78,7 @@
String activeLabel = "";
if (haService != null) {
- if (haService.isActiveStatus()) {
+ if (haService.isActiveMaster()) {
activeLabel = "<font color='#1e90ff'>(active)</font>";
} else {
activeLabel = "<font color='#1e90ff'>(backup)</font>";
@@ -105,7 +111,7 @@
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2>
+ <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2>
<div>Live:<%=numLiveMasters%>, Dead: <%=deadMasterHtml%>, Total: <%=masters.size()%></div>
<%
if (masters != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 468fc72..e0cf876 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -35,10 +35,16 @@
<%@ page import="java.util.Collection" %>
<%@ page import="java.util.Date" %>
<%@ page import="java.util.Map" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+
+ String[] masterName = master.getMasterName().split(":");
+ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1]));
+ String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort();
+
Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
@@ -83,7 +89,7 @@
String activeLabel = "";
if (haService != null) {
- if (haService.isActiveStatus()) {
+ if (haService.isActiveMaster()) {
activeLabel = "<font color='#1e90ff'>(active)</font>";
} else {
activeLabel = "<font color='#1e90ff'>(backup)</font>";
@@ -114,7 +120,7 @@
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2>
+ <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2>
<hr/>
<h3>Master Status</h3>
<table border='0'>
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 43e7775..ca376bb 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -29,10 +29,15 @@
<%@ page import="java.util.*" %>
<%@ page import="org.apache.tajo.util.history.HistoryReader" %>
<%@ page import="org.apache.tajo.master.QueryInfo" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ String[] masterName = master.getMasterName().split(":");
+ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1]));
+ String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort();
+
List<QueryInProgress> runningQueries =
new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries());
@@ -113,7 +118,7 @@
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2>
+ <h2>Tajo Master: <%=masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2>
<hr/>
<h3>Running Queries</h3>
<%
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index a0f9a0a..1a58583 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -22,14 +22,19 @@
<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="javax.xml.ws.Service" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ String[] masterName = master.getMasterName().split(":");
+ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1]));
+ String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort();
+
ServiceTracker haService = master.getContext().getHAService();
String activeLabel = "";
if (haService != null) {
- if (haService.isActiveStatus()) {
+ if (haService.isActiveMaster()) {
activeLabel = "<font color='#1e90ff'>(active)</font>";
} else {
activeLabel = "<font color='#1e90ff'>(backup)</font>";
@@ -288,7 +293,7 @@ function getPage() {
<body>
<%@ include file="header.jsp"%>
<div class='contents'>
- <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2>
+ <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2>
<hr/>
<h3>Query</h3>
Database :
http://git-wip-us.apache.org/repos/asf/tajo/blob/31c4630d/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index 6415588..c714749 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -30,6 +30,7 @@ import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.junit.Test;
+import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.*;
@@ -68,12 +69,12 @@ public class TestHAServiceHDFSImpl {
verifySystemDirectories(fs);
- Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName()
- .replaceAll(":", "_"));
- assertTrue(fs.exists(backupMasterFile));
+ assertEquals(2, fs.listStatus(activePath).length);
+ assertEquals(1, fs.listStatus(backupPath).length);
- assertTrue(cluster.getMaster().isActiveMaster());
- assertFalse(backupMaster.isActiveMaster());
+ assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
+ assertTrue(fs.exists(new Path(activePath, cluster.getMaster().getMasterName().replaceAll(":", "_"))));
+ assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_"))));
createDatabaseAndTable();
verifyDataBaseAndTable();
@@ -81,13 +82,14 @@ public class TestHAServiceHDFSImpl {
cluster.getMaster().stop();
- Thread.sleep(7000);
-
- assertFalse(cluster.getMaster().isActiveMaster());
- assertTrue(backupMaster.isActiveMaster());
-
client = cluster.newTajoClient();
verifyDataBaseAndTable();
+
+ assertEquals(2, fs.listStatus(activePath).length);
+ assertEquals(0, fs.listStatus(backupPath).length);
+
+ assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
+ assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_"))));
} finally {
client.close();
backupMaster.stop();
@@ -107,11 +109,12 @@ public class TestHAServiceHDFSImpl {
conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
masterAddress + ":" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
- masterAddress + ":" + NetUtils.getFreeSocketPort());
+ masterAddress + ":" + NetUtils.getFreeSocketPort());
conf.setIntVar(TajoConf.ConfVars.REST_SERVICE_PORT,
NetUtils.getFreeSocketPort());
conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
+ conf.setIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL, 1000);
//Client API service RPC Server
conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
@@ -134,9 +137,6 @@ public class TestHAServiceHDFSImpl {
backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
assertTrue(fs.exists(backupPath));
-
- assertEquals(1, fs.listStatus(activePath).length);
- assertEquals(1, fs.listStatus(backupPath).length);
}
private void createDatabaseAndTable() throws Exception {