You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/02/02 13:29:09 UTC
[04/10] tajo git commit: TAJO-1306: HAServiceUtil should not directly
use HDFS.
TAJO-1306: HAServiceUtil should not directly use HDFS.
Closes #358
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4595375f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4595375f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4595375f
Branch: refs/heads/index_support
Commit: 4595375f7e6b62436e0d4bf88a8aef1ca680c726
Parents: 015913b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 28 09:23:20 2015 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 28 09:23:20 2015 -0800
----------------------------------------------------------------------
.../tajo/catalog/AbstractCatalogClient.java | 12 +-
.../apache/tajo/client/DummyServiceTracker.java | 84 +++
.../apache/tajo/client/SessionConnection.java | 44 +-
.../org/apache/tajo/client/TajoClientImpl.java | 38 +-
.../apache/tajo/client/TajoHAClientUtil.java | 14 +-
.../java/org/apache/tajo/conf/TajoConf.java | 7 +-
.../java/org/apache/tajo/ha/HAServiceUtil.java | 39 --
.../apache/tajo/service/BaseServiceTracker.java | 97 ++++
.../apache/tajo/service/HAServiceTracker.java | 48 ++
.../org/apache/tajo/service/ServiceTracker.java | 63 ++
.../tajo/service/ServiceTrackerException.java | 30 +
.../tajo/service/ServiceTrackerFactory.java | 41 ++
.../org/apache/tajo/service/TajoMasterInfo.java | 89 +++
.../org/apache/tajo/benchmark/BenchmarkSet.java | 15 +-
.../main/java/org/apache/tajo/ha/HAService.java | 56 --
.../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 ----------
.../org/apache/tajo/ha/HdfsServiceTracker.java | 576 +++++++++++++++++++
.../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ---
.../apache/tajo/master/TajoContainerProxy.java | 27 +-
.../java/org/apache/tajo/master/TajoMaster.java | 30 +-
.../apache/tajo/querymaster/QueryMaster.java | 66 +--
.../main/java/org/apache/tajo/util/JSPUtil.java | 12 +-
.../tajo/worker/TajoResourceAllocator.java | 28 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 46 +-
.../tajo/worker/TajoWorkerClientService.java | 2 +-
.../tajo/worker/WorkerHeartbeatService.java | 20 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 11 +-
.../resources/webapps/admin/catalogview.jsp | 5 +-
.../main/resources/webapps/admin/cluster.jsp | 7 +-
.../src/main/resources/webapps/admin/index.jsp | 7 +-
.../resources/webapps/admin/query_executor.jsp | 5 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 10 +-
.../org/apache/tajo/jdbc/JdbcConnection.java | 10 +-
.../org/apache/tajo/jdbc/TajoStatement.java | 4 +-
.../org/apache/tajo/storage/StorageUtil.java | 16 -
35 files changed, 1156 insertions(+), 808 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1a2fd44..718f7d6 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,12 +29,13 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
import java.net.InetSocketAddress;
@@ -48,6 +49,7 @@ import java.util.List;
public abstract class AbstractCatalogClient implements CatalogService {
private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+ protected ServiceTracker serviceTracker;
protected RpcConnectionPool pool;
protected InetSocketAddress catalogServerAddr;
protected TajoConf conf;
@@ -57,6 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
this.pool = RpcConnectionPool.getPool();
this.catalogServerAddr = catalogServerAddr;
+ this.serviceTracker = ServiceTrackerFactory.get(conf);
this.conf = conf;
}
@@ -64,14 +67,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
if (catalogServerAddr == null) {
return null;
} else {
+
if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
return catalogServerAddr;
} else {
- if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) {
- return HAServiceUtil.getCatalogAddress(conf);
- } else {
- return catalogServerAddr;
- }
+ return serviceTracker.getCatalogAddress();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
new file mode 100644
index 0000000..762c2e7
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class DummyServiceTracker implements ServiceTracker {
+ private InetSocketAddress address;
+
+ public DummyServiceTracker(InetSocketAddress address) {
+ this.address = address;
+ }
+
+ @Override
+ public boolean isHighAvailable() {
+ return false;
+ }
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ return address;
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void register() throws IOException {
+ }
+
+ @Override
+ public void delete() throws IOException {
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return true;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ throw new UnsupportedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 5490be4..3e2b9cc 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,12 +21,10 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -34,6 +32,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -58,8 +57,6 @@ public class SessionConnection implements Closeable {
private final TajoConf conf;
- final InetSocketAddress tajoMasterAddr;
-
final RpcConnectionPool connPool;
private final String baseDatabase;
@@ -73,41 +70,29 @@ public class SessionConnection implements Closeable {
/** session variable cache */
private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
-
- public SessionConnection(TajoConf conf) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
- }
-
- public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
- }
-
- public SessionConnection(InetSocketAddress addr) throws IOException {
- this(new TajoConf(), addr, null);
- }
-
- public SessionConnection(String hostname, int port, String baseDatabase) throws IOException {
- this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
- }
+ private ServiceTracker serviceTracker;
/**
* Connect to TajoMaster
*
* @param conf TajoConf
- * @param addr TajoMaster address
+ * @param tracker TajoMaster address
* @param baseDatabase The base database name. It is case sensitive. If it is null,
* the 'default' database will be used.
* @throws java.io.IOException
*/
- public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+ public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase)
+ throws IOException {
+
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
- this.tajoMasterAddr = addr;
int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
// Don't share connection pool per client
connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
+
+ this.serviceTracker = tracker;
}
public Map<String, String> getClientSideSessionVars() {
@@ -140,7 +125,8 @@ public class SessionConnection implements Closeable {
public boolean isConnected() {
if(!closed.get()){
try {
- return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+ return connPool.getConnection(serviceTracker.getClientServiceAddress(),
+ TajoMasterClientProtocol.class, false).isConnected();
} catch (Throwable e) {
return false;
}
@@ -309,15 +295,7 @@ public class SessionConnection implements Closeable {
}
protected InetSocketAddress getTajoMasterAddr() {
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return tajoMasterAddr;
- } else {
- if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
- return HAServiceUtil.getMasterClientAddress(conf);
- } else {
- return tajoMasterAddr;
- }
- }
+ return serviceTracker.getClientServiceAddress();
}
protected void checkSessionAndGet(NettyClientBase client) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 8eafc91..f8eef28 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -32,8 +32,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.jdbc.TajoResultSet;
@@ -41,7 +39,8 @@ import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -56,41 +55,30 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
QueryClient queryClient;
CatalogAdminClient catalogClient;
- public TajoClientImpl(TajoConf conf) throws IOException {
- this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null);
- }
-
- public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
- this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase);
- }
-
- public TajoClientImpl(InetSocketAddress addr) throws IOException {
- this(new TajoConf(), addr, null);
- }
-
/**
* Connect to TajoMaster
*
* @param conf TajoConf
- * @param addr TajoMaster address
+ * @param tracker ServiceTracker to discovery Tajo Client RPC
* @param baseDatabase The base database name. It is case sensitive. If it is null,
* the 'default' database will be used.
* @throws java.io.IOException
*/
- public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
- super(conf, addr, baseDatabase);
+ public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException {
+ super(conf, tracker, baseDatabase);
+
this.queryClient = new QueryClientImpl(this);
this.catalogClient = new CatalogAdminClientImpl(this);
-
+
diagnoseTajoClient();
}
- public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
- super(hostName, port, baseDatabase);
- this.queryClient = new QueryClientImpl(this);
- this.catalogClient = new CatalogAdminClientImpl(this);
-
- diagnoseTajoClient();
+ public TajoClientImpl(TajoConf conf) throws IOException {
+ this(conf, ServiceTrackerFactory.get(conf), null);
+ }
+
+ public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+ this(conf, ServiceTrackerFactory.get(conf), baseDatabase);
}
private void diagnoseTajoClient() throws EvaluationFailedException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index 12a9ec8..7267b10 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -40,10 +40,8 @@ import com.google.protobuf.ServiceException;
import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.NetUtils;
import java.io.IOException;
-import java.net.InetSocketAddress;
public class TajoHAClientUtil {
/**
@@ -65,6 +63,7 @@ public class TajoHAClientUtil {
TajoCliContext context) throws IOException, ServiceException {
if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+
if (!HAServiceUtil.isMasterAlive(conf.getVar(
TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
TajoClient tajoClient = null;
@@ -85,15 +84,4 @@ public class TajoHAClientUtil {
return client;
}
}
-
-
- public static InetSocketAddress getRpcClientAddress(TajoConf conf) {
- if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf));
- } else {
- return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1bb96bc..fe5ff54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.ConfigKey;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.service.BaseServiceTracker;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.TUtil;
@@ -134,10 +135,14 @@ public class TajoConf extends Configuration {
Validators.networkAddr()),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
- // Tajo Master HA Configurations
+ // High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
+ // Service discovery
+ DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
+ HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
+
// Resource tracker service
RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
Validators.networkAddr()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
index 52c2ade..7001228 100644
--- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java
@@ -18,8 +18,6 @@
package org.apache.tajo.ha;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
@@ -34,15 +32,6 @@ import java.util.ArrayList;
import java.util.List;
public class HAServiceUtil {
- private static Log LOG = LogFactory.getLog(HAServiceUtil.class);
-
- public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
- }
-
- public static String getMasterUmbilicalName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf));
- }
public static InetSocketAddress getMasterClientAddress(TajoConf conf) {
return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS);
@@ -52,30 +41,6 @@ public class HAServiceUtil {
return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf));
}
- public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- }
-
- public static String getResourceTrackerName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf));
- }
-
- public static InetSocketAddress getCatalogAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS);
- }
-
- public static String getCatalogName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf));
- }
-
- public static InetSocketAddress getMasterInfoAddress(TajoConf conf) {
- return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS);
- }
-
- public static String getMasterInfoName(TajoConf conf) {
- return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf));
- }
-
public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
InetSocketAddress masterAddress = null;
@@ -153,10 +118,6 @@ public class HAServiceUtil {
return masterAddress;
}
- public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
- return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf);
- }
-
public static boolean isMasterAlive(String masterName, TajoConf conf) {
boolean isAlive = true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
new file mode 100644
index 0000000..bf7fd2c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class BaseServiceTracker implements ServiceTracker {
+ private final TajoConf conf;
+ private TajoMasterInfo tajoMasterInfo;
+ private List<TajoMasterInfo> tajoMasterInfos;
+
+ @SuppressWarnings("unused")
+ public BaseServiceTracker(TajoConf conf) {
+ this.conf = conf;
+
+ tajoMasterInfo = new TajoMasterInfo();
+ tajoMasterInfo.setActive(true);
+ tajoMasterInfo.setAvailable(true);
+ tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+ tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS));
+ tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS));
+
+ tajoMasterInfos = TUtil.newList(tajoMasterInfo);
+ }
+
+ @Override
+ public boolean isHighAvailable() {
+ return false;
+ }
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ return tajoMasterInfo.getTajoMasterAddress();
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ return tajoMasterInfo.getTajoClientAddress();
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ return tajoMasterInfo.getWorkerResourceTrackerAddr();
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ return tajoMasterInfo.getCatalogAddress();
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ return tajoMasterInfo.getWebServerAddress();
+ }
+
+ @Override
+ public void register() throws IOException {
+ }
+
+ @Override
+ public void delete() throws IOException {
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return true;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ return tajoMasterInfos;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
new file mode 100644
index 0000000..c808537
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+public abstract class HAServiceTracker implements ServiceTracker {
+
+ static SocketFactory socketFactory = SocketFactory.getDefault();
+
+ public boolean isHighAvailable() {
+ return true;
+ }
+
+ public static boolean checkConnection(InetSocketAddress address) {
+ boolean isAlive = true;
+
+ try {
+ int connectionTimeout = 10;
+
+ Socket socket = socketFactory.createSocket();
+ NetUtils.connect(socket, address, connectionTimeout);
+ } catch (Exception e) {
+ isAlive = false;
+ }
+ return isAlive;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
new file mode 100644
index 0000000..73ff112
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public interface ServiceTracker {
+
+ public abstract boolean isHighAvailable();
+
+ public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException;
+
+ public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException;
+
+ /**
+ * Add master name to shared storage.
+ */
+ public void register() throws IOException;
+
+
+ /**
+ * Delete master name to shared storage.
+ *
+ */
+ public void delete() throws IOException;
+
+ /**
+ *
+ * @return True if current master is an active master.
+ */
+ public boolean isActiveStatus();
+
+ /**
+ *
+ * @return return all master list
+ * @throws IOException
+ */
+ public List<TajoMasterInfo> getMasters() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
new file mode 100644
index 0000000..3407c51
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+public class ServiceTrackerException extends RuntimeException {
+
+ public ServiceTrackerException(Throwable t) {
+ super(t);
+ }
+
+ public ServiceTrackerException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
new file mode 100644
index 0000000..5828055
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.ReflectionUtil;
+
+public class ServiceTrackerFactory {
+
+ public static ServiceTracker get(TajoConf conf) {
+ Class<ServiceTracker> trackerClass;
+
+ try {
+ if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.HA_SERVICE_TRACKER_CLASS);
+ } else {
+ trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.DEFAULT_SERVICE_TRACKER_CLASS);
+ }
+ return ReflectionUtil.newInstance(trackerClass, conf);
+
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
new file mode 100644
index 0000000..481b528
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.service;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+ private boolean available;
+ private boolean isActive;
+
+ private InetSocketAddress tajoMasterAddress;
+ private InetSocketAddress tajoClientAddress;
+ private InetSocketAddress workerResourceTrackerAddr;
+ private InetSocketAddress catalogAddress;
+ private InetSocketAddress webServerAddress;
+
+ public InetSocketAddress getTajoMasterAddress() {
+ return tajoMasterAddress;
+ }
+
+ public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+ this.tajoMasterAddress = tajoMasterAddress;
+ }
+
+ public InetSocketAddress getTajoClientAddress() {
+ return tajoClientAddress;
+ }
+
+ public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+ this.tajoClientAddress = tajoClientAddress;
+ }
+
+ public InetSocketAddress getWorkerResourceTrackerAddr() {
+ return workerResourceTrackerAddr;
+ }
+
+ public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+ this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+ }
+
+ public InetSocketAddress getCatalogAddress() {
+ return catalogAddress;
+ }
+
+ public void setCatalogAddress(InetSocketAddress catalogAddress) {
+ this.catalogAddress = catalogAddress;
+ }
+
+ public InetSocketAddress getWebServerAddress() {
+ return webServerAddress;
+ }
+
+ public void setWebServerAddress(InetSocketAddress webServerAddress) {
+ this.webServerAddress = webServerAddress;
+ }
+
+ public boolean isAvailable() {
+ return available;
+ }
+
+ public void setAvailable(boolean available) {
+ this.available = available;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void setActive(boolean active) {
+ isActive = active;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b1b6450..0304e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -18,19 +18,23 @@
package org.apache.tajo.benchmark;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.store.MemStore;
+import org.apache.tajo.client.DummyServiceTracker;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.FileUtil;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -43,9 +47,14 @@ public abstract class BenchmarkSet {
public void init(TajoConf conf, String dataDir) throws IOException {
this.dataDir = dataDir;
- if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
- tajo = new TajoClientImpl(NetUtils.createSocketAddr(
- System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
+
+ if (System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname) != null) {
+
+ String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(addressStr);
+ ServiceTracker serviceTracker = new DummyServiceTracker(addr);
+ tajo = new TajoClientImpl(conf, serviceTracker, null);
+
} else {
conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
tajo = new TajoClientImpl(conf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
deleted file mode 100644
index 1329223..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
- /**
- * Add master name to shared storage.
- */
- public void register() throws IOException;
-
-
- /**
- * Delete master name to shared storage.
- *
- */
- public void delete() throws IOException;
-
- /**
- *
- * @return True if current master is an active master.
- */
- public boolean isActiveStatus();
-
- /**
- *
- * @return return all master list
- * @throws IOException
- */
- public List<TajoMasterInfo> getMasters() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index e18a9b2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
- private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
- private MasterContext context;
- private TajoConf conf;
-
- private FileSystem fs;
-
- private String masterName;
- private Path rootPath;
- private Path haPath;
- private Path activePath;
- private Path backupPath;
-
- private boolean isActiveStatus = false;
-
- //thread which runs periodically to see the last time since a heartbeat is received.
- private Thread checkerThread;
- private volatile boolean stopped = false;
-
- private int monitorInterval;
-
- private String currentActiveMaster;
-
- public HAServiceHDFSImpl(MasterContext context) throws IOException {
- this.context = context;
- this.conf = context.getConf();
- initSystemDirectory();
-
- InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
- this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
- monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
- }
-
- private void initSystemDirectory() throws IOException {
- // Get Tajo root dir
- this.rootPath = TajoConf.getTajoRootDir(conf);
-
- // Check Tajo root dir
- this.fs = rootPath.getFileSystem(conf);
-
- // Check and create Tajo system HA dir
- haPath = TajoConf.getSystemHADir(conf);
- if (!fs.exists(haPath)) {
- fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA dir '" + haPath + "' is created");
- }
-
- activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- if (!fs.exists(activePath)) {
- fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Active dir '" + activePath + "' is created");
- }
-
- backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
- if (!fs.exists(backupPath)) {
- fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Backup dir '" + backupPath + "' is created");
- }
- }
-
- private void startPingChecker() {
- if (checkerThread == null) {
- checkerThread = new Thread(new PingChecker());
- checkerThread.setName("Ping Checker");
- checkerThread.start();
- }
- }
-
- @Override
- public void register() throws IOException {
- FileStatus[] files = fs.listStatus(activePath);
-
- // Phase 1: If there is not another active master, this try to become active master.
- if (files.length == 0) {
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 2: If there is active master information, we need to check its status.
- Path activePath = files[0].getPath();
- currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
- // Phase 3: If current active master is dead, this master should be active master.
- if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
- fs.delete(activePath, true);
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 4: If current active master is alive, this master need to be backup master.
- createMasterFile(false);
- LOG.info(String.format("This is added to backup masters (%s)", masterName));
- }
- }
- }
-
- private void createMasterFile(boolean isActive) throws IOException {
- String fileName = masterName.replaceAll(":", "_");
- Path path = null;
-
- if (isActive) {
- path = new Path(activePath, fileName);
- } else {
- path = new Path(backupPath, fileName);
- }
-
- StringBuilder sb = new StringBuilder();
- InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.CATALOG_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
- FSDataOutputStream out = fs.create(path);
-
- try {
- out.writeUTF(sb.toString());
- out.hflush();
- out.close();
- } catch (FileAlreadyExistsException e) {
- createMasterFile(false);
- }
-
- if (isActive) {
- isActiveStatus = true;
- } else {
- isActiveStatus = false;
- }
-
- startPingChecker();
- }
-
-
- private InetSocketAddress getHostAddress(int type) {
- InetSocketAddress address = null;
-
- switch (type) {
- case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- break;
- case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS);
- break;
- case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS);
- break;
- case HAConstants.CATALOG_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .CATALOG_ADDRESS);
- break;
- case HAConstants.MASTER_INFO_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_INFO_ADDRESS);
- default:
- break;
- }
-
- return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
- }
-
- @Override
- public void delete() throws IOException {
- String fileName = masterName.replaceAll(":", "_");
-
- Path activeFile = new Path(activePath, fileName);
- if (fs.exists(activeFile)) {
- fs.delete(activeFile, true);
- }
-
- Path backupFile = new Path(backupPath, fileName);
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true);
- }
- if (isActiveStatus) {
- isActiveStatus = false;
- }
- stopped = true;
- }
-
- @Override
- public boolean isActiveStatus() {
- return isActiveStatus;
- }
-
- @Override
- public List<TajoMasterInfo> getMasters() throws IOException {
- List<TajoMasterInfo> list = TUtil.newList();
- Path path = null;
-
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 1) {
- path = files[0].getPath();
- list.add(createTajoMasterInfo(path, true));
- }
-
- files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- path = status.getPath();
- list.add(createTajoMasterInfo(path, false));
- }
-
- return list;
- }
-
- private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
- String masterAddress = path.getName().replaceAll("_", ":");
- boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
- FSDataInputStream stream = fs.open(path);
- String data = stream.readUTF();
-
- stream.close();
-
- String[] addresses = data.split("_");
- TajoMasterInfo info = new TajoMasterInfo();
-
- info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
- info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
- info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
- info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
- info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
- info.setAvailable(isAlive);
- info.setActive(isActive);
-
- return info;
- }
-
- private class PingChecker implements Runnable {
- @Override
- public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- synchronized (HAServiceHDFSImpl.this) {
- try {
- if (!currentActiveMaster.equals(masterName)) {
- boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
- + ", isAlive:" + isAlive);
- }
-
- // If active master is dead, this master should be active master instead of
- // previous active master.
- if (!isAlive) {
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 0 || (files.length == 1
- && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
- delete();
- register();
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(monitorInterval);
- } catch (InterruptedException e) {
- LOG.info("PingChecker interrupted. - masterName:" + masterName);
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
new file mode 100644
index 0000000..1475a5d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.service.HAServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.TUtil;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+@SuppressWarnings("unused")
+public class HdfsServiceTracker extends HAServiceTracker {
+ private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class);
+
+ private TajoConf conf;
+
+ private FileSystem fs;
+
+ private String masterName;
+ private Path rootPath;
+ private Path haPath;
+ private Path activePath;
+ private Path backupPath;
+
+ private boolean isActiveStatus = false;
+
+ //thread which runs periodically to see the last time since a heartbeat is received.
+ private Thread checkerThread;
+ private volatile boolean stopped = false;
+
+ private int monitorInterval;
+
+ private String currentActiveMaster;
+
+ public HdfsServiceTracker(TajoConf conf) throws IOException {
+ this.conf = conf;
+ initSystemDirectory();
+
+ InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+ monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+ }
+
+ private void initSystemDirectory() throws IOException {
+ // Get Tajo root dir
+ this.rootPath = TajoConf.getTajoRootDir(conf);
+
+ // Check Tajo root dir
+ this.fs = rootPath.getFileSystem(conf);
+
+ // Check and create Tajo system HA dir
+ haPath = TajoConf.getSystemHADir(conf);
+ if (!fs.exists(haPath)) {
+ fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA dir '" + haPath + "' is created");
+ }
+
+ activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ if (!fs.exists(activePath)) {
+ fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Active dir '" + activePath + "' is created");
+ }
+
+ backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ if (!fs.exists(backupPath)) {
+ fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Backup dir '" + backupPath + "' is created");
+ }
+ }
+
+ private void startPingChecker() {
+ if (checkerThread == null) {
+ checkerThread = new Thread(new PingChecker());
+ checkerThread.setName("Ping Checker");
+ checkerThread.start();
+ }
+ }
+
+ @Override
+ public void register() throws IOException {
+ FileStatus[] files = fs.listStatus(activePath);
+
+ // Phase 1: If there is not another active master, this try to become active master.
+ if (files.length == 0) {
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 2: If there is active master information, we need to check its status.
+ Path activePath = files[0].getPath();
+ currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+ // Phase 3: If current active master is dead, this master should be active master.
+ if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+ fs.delete(activePath, true);
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 4: If current active master is alive, this master need to be backup master.
+ createMasterFile(false);
+ LOG.info(String.format("This is added to backup masters (%s)", masterName));
+ }
+ }
+ }
+
+ /**
+ * It will creates the following form string. It includes
+ *
+ * <pre>
+ * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
+ * </pre>
+ *
+ * @param isActive A boolean flag to indicate if it is for master or not.
+ * @throws IOException
+ */
+ private void createMasterFile(boolean isActive) throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+ Path path = null;
+
+ if (isActive) {
+ path = new Path(activePath, fileName);
+ } else {
+ path = new Path(backupPath, fileName);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+ FSDataOutputStream out = fs.create(path);
+
+ try {
+ out.writeUTF(sb.toString());
+ out.hsync();
+ out.close();
+ } catch (FileAlreadyExistsException e) {
+ createMasterFile(false);
+ }
+
+ if (isActive) {
+ isActiveStatus = true;
+ } else {
+ isActiveStatus = false;
+ }
+
+ startPingChecker();
+ }
+
+
+ private InetSocketAddress getHostAddress(int type) {
+ InetSocketAddress address = null;
+
+ switch (type) {
+ case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ break;
+ case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+ break;
+ case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+ break;
+ case HAConstants.CATALOG_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
+ break;
+ case HAConstants.MASTER_INFO_ADDRESS:
+ address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+ default:
+ break;
+ }
+
+ return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+ }
+
+ @Override
+ public void delete() throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+
+ Path activeFile = new Path(activePath, fileName);
+ if (fs.exists(activeFile)) {
+ fs.delete(activeFile, true);
+ }
+
+ Path backupFile = new Path(backupPath, fileName);
+ if (fs.exists(backupFile)) {
+ fs.delete(backupFile, true);
+ }
+ if (isActiveStatus) {
+ isActiveStatus = false;
+ }
+ stopped = true;
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return isActiveStatus;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ List<TajoMasterInfo> list = TUtil.newList();
+ Path path = null;
+
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ path = files[0].getPath();
+ list.add(createTajoMasterInfo(path, true));
+ }
+
+ files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ path = status.getPath();
+ list.add(createTajoMasterInfo(path, false));
+ }
+
+ return list;
+ }
+
+ private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+ String masterAddress = path.getName().replaceAll("_", ":");
+ boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+ FSDataInputStream stream = fs.open(path);
+ String data = stream.readUTF();
+
+ stream.close();
+
+ String[] addresses = data.split("_");
+ TajoMasterInfo info = new TajoMasterInfo();
+
+ info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+ info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+ info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+ info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+ info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+ info.setAvailable(isAlive);
+ info.setActive(isActive);
+
+ return info;
+ }
+
+ private class PingChecker implements Runnable {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (HdfsServiceTracker.this) {
+ try {
+ if (!currentActiveMaster.equals(masterName)) {
+ boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+ + ", isAlive:" + isAlive);
+ }
+
+ // If active master is dead, this master should be active master instead of
+ // previous active master.
+ if (!isAlive) {
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 0 || (files.length == 1
+ && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+ delete();
+ register();
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(monitorInterval);
+ } catch (InterruptedException e) {
+ LOG.info("PingChecker interrupted. - masterName:" + masterName);
+ break;
+ }
+ }
+ }
+ }
+
+ private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0;
+ private final static int MASTER_CLIENT_RPC_ADDRESS = 1;
+ private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2;
+ private final static int CATALOG_ADDRESS = 3;
+ private final static int MASTER_HTTP_INFO = 4;
+
+ private volatile InetSocketAddress umbilicalRpcAddr;
+ private volatile InetSocketAddress clientRpcAddr;
+ private volatile InetSocketAddress resourceTrackerRpcAddr;
+ private volatile InetSocketAddress catalogAddr;
+ private volatile InetSocketAddress masterHttpInfoAddr;
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() {
+ if (!checkConnection(umbilicalRpcAddr)) {
+ umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS));
+ }
+
+ return umbilicalRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() {
+ if (!checkConnection(clientRpcAddr)) {
+ clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS));
+ }
+
+ return clientRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() {
+ if (!checkConnection(resourceTrackerRpcAddr)) {
+ resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS));
+ }
+
+ return resourceTrackerRpcAddr;
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() {
+ if (!checkConnection(catalogAddr)) {
+ catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS));
+ }
+
+ return catalogAddr;
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ if (!checkConnection(masterHttpInfoAddr)) {
+ masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO));
+ }
+
+ return masterHttpInfoAddr;
+ }
+
+ /**
+ * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. *
+ *
+ * @param conf
+ * @return all service addresses
+ * @throws ServiceTrackerException
+ */
+ private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException {
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+
+ if (!fs.exists(activeMasterBaseDir)) {
+ throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir);
+ }
+ if (!fs.isDirectory(activeMasterBaseDir)) {
+ throw new ServiceTrackerException("Active master base path must be a directory.");
+ }
+
+ FileStatus[] files = fs.listStatus(activeMasterBaseDir);
+
+ if (files.length < 1) {
+ throw new ServiceTrackerException("No active master entry");
+ } else if (files.length > 1) {
+ throw new ServiceTrackerException("Two or more than active master entries.");
+ }
+
+ // We can ensure that there is only one file due to the above assertion.
+ Path activeMasterEntry = files[0].getPath();
+
+ if (!fs.isFile(activeMasterEntry)) {
+ throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
+ }
+
+ List<String> addressElements = TUtil.newList();
+
+ addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements
+
+ FSDataInputStream stream = fs.open(activeMasterEntry);
+ String data = stream.readUTF();
+ stream.close();
+
+ addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
+
+ // ensure the number of entries
+ Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary.");
+
+ return addressElements;
+
+ } catch (Throwable t) {
+ throw new ServiceTrackerException(t);
+ }
+ }
+
+
+ public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) {
+ return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf);
+ }
+
+ public static boolean isMasterAlive(String masterName, TajoConf conf) {
+ boolean isAlive = true;
+
+ try {
+ // how to create sockets
+ SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf);
+
+ int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
+
+ InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName);
+
+ // connected socket
+ Socket socket = socketFactory.createSocket();
+ org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout);
+ } catch (Exception e) {
+ isAlive = false;
+ }
+ return isAlive;
+ }
+
+ public static int getState(String masterName, TajoConf conf) {
+ String targetMaster = masterName.replaceAll(":", "_");
+ int retValue = -1;
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 0;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (temPath.getName().equals(targetMaster)) {
+ return 1;
+ }
+ }
+ retValue = -2;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+ public static int formatHA(TajoConf conf) {
+ int retValue = -1;
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ int aliveMasterCount = 0;
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) {
+ aliveMasterCount++;
+ }
+ }
+
+ // If there is any alive master, users can't format storage.
+ if (aliveMasterCount > 0) {
+ return 0;
+ }
+
+ // delete ha path.
+ fs.delete(TajoConf.getSystemHADir(conf), true);
+ retValue = 1;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return retValue;
+ }
+
+
+ public static List<String> getMasters(TajoConf conf) {
+ List<String> list = new ArrayList<String>();
+
+ try {
+ FileSystem fs = getFileSystem(conf);
+ Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ Path temPath = null;
+
+ // Check backup masters
+ FileStatus[] files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ temPath = status.getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ // Check active master
+ files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ temPath = files[0].getPath();
+ list.add(temPath.getName().replaceAll("_", ":"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return list;
+ }
+
+ private static FileSystem getFileSystem(TajoConf conf) throws IOException {
+ Path rootPath = TajoConf.getTajoRootDir(conf);
+ return rootPath.getFileSystem(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
deleted file mode 100644
index c6fdd40..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
- private boolean available;
- private boolean isActive;
-
- private InetSocketAddress tajoMasterAddress;
- private InetSocketAddress tajoClientAddress;
- private InetSocketAddress workerResourceTrackerAddr;
- private InetSocketAddress catalogAddress;
- private InetSocketAddress webServerAddress;
-
- public InetSocketAddress getTajoMasterAddress() {
- return tajoMasterAddress;
- }
-
- public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
- this.tajoMasterAddress = tajoMasterAddress;
- }
-
- public InetSocketAddress getTajoClientAddress() {
- return tajoClientAddress;
- }
-
- public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
- this.tajoClientAddress = tajoClientAddress;
- }
-
- public InetSocketAddress getWorkerResourceTrackerAddr() {
- return workerResourceTrackerAddr;
- }
-
- public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
- this.workerResourceTrackerAddr = workerResourceTrackerAddr;
- }
-
- public InetSocketAddress getCatalogAddress() {
- return catalogAddress;
- }
-
- public void setCatalogAddress(InetSocketAddress catalogAddress) {
- this.catalogAddress = catalogAddress;
- }
-
- public InetSocketAddress getWebServerAddress() {
- return webServerAddress;
- }
-
- public void setWebServerAddress(InetSocketAddress webServerAddress) {
- this.webServerAddress = webServerAddress;
- }
-
- public boolean isAvailable() {
- return available;
- }
-
- public void setAvailable(boolean available) {
- this.available = available;
- }
-
- public boolean isActive() {
- return isActive;
- }
-
- public void setActive(boolean active) {
- isActive = active;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 42ffd87..996d356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -38,6 +38,8 @@ import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.worker.TajoWorker;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -45,6 +47,7 @@ import java.util.List;
public class TajoContainerProxy extends ContainerProxy {
private final QueryContext queryContext;
+ private final TajoWorker.WorkerContext workerContext;
private final String planJson;
public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
@@ -52,6 +55,7 @@ public class TajoContainerProxy extends ContainerProxy {
QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
super(context, conf, executionBlockId, container);
this.queryContext = queryContext;
+ this.workerContext = context.getQueryMasterContext().getWorkerContext();
this.planJson = planJson;
}
@@ -171,27 +175,8 @@ public class TajoContainerProxy extends ContainerProxy {
RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- TajoConf conf = context.getConf();
- if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- } catch (Exception e) {
- context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(conf));
- context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(conf));
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- QueryCoordinatorProtocol.class, true);
- }
+ ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
+ tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 786025a..a11606f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -40,18 +40,18 @@ import org.apache.tajo.catalog.LocalCatalogWrapper;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.ha.HAService;
-import org.apache.tajo.ha.HAServiceHDFSImpl;
-import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
-import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.SessionManager;
+import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.session.SessionManager;
import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.*;
@@ -127,7 +127,7 @@ public class TajoMaster extends CompositeService {
private TajoSystemMetrics systemMetrics;
- private HAService haService;
+ private ServiceTracker haService;
private JvmPauseMonitor pauseMonitor;
@@ -226,15 +226,6 @@ public class TajoMaster extends CompositeService {
}
}
-
- private void initHAManger() throws Exception {
- // If tajo provides haService based on ZooKeeper, following codes need to update.
- if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) {
- haService = new HAServiceHDFSImpl(context);
- haService.register();
- }
- }
-
public boolean isActiveMaster() {
return (haService != null ? haService.isActiveStatus() : true);
}
@@ -326,11 +317,8 @@ public class TajoMaster extends CompositeService {
initSystemMetrics();
- try {
- initHAManger();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
+ haService = ServiceTrackerFactory.get(systemConf);
+ haService.register();
historyWriter = new HistoryWriter(getMasterName(), true);
historyWriter.init(getConfig());
@@ -477,7 +465,7 @@ public class TajoMaster extends CompositeService {
return systemMetrics;
}
- public HAService getHAService() {
+ public ServiceTracker getHAService() {
return haService;
}