You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/03/25 20:34:55 UTC
svn commit: r1581479 [2/9] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-it/src/test/java/org/apache/hadoop/hbase/
hbase-it/src/test/java/org/apache/hadoo...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Mar 25 19:34:52 2014
@@ -26,30 +26,21 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import javax.management.ObjectName;
+import javax.servlet.http.HttpServlet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -57,7 +48,6 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
@@ -70,25 +60,18 @@ import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
-import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
-import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@@ -108,150 +91,33 @@ import org.apache.hadoop.hbase.master.sn
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.InfoServer;
-import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
-import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
-import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.metrics.util.MBeanUtil;
-import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
/**
* HMaster is the "master server" for HBase. An HBase cluster has one active
@@ -270,22 +136,13 @@ import com.google.protobuf.ServiceExcept
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
-public class HMaster extends HasThread implements MasterProtos.MasterService.BlockingInterface,
-RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
-MasterServices, Server {
+public class HMaster extends HRegionServer implements MasterServices, Server {
private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
// MASTER is name of the webapp and the attribute name used stuffing this
//instance into web context.
public static final String MASTER = "master";
- // The configuration for the Master
- private final Configuration conf;
- // server for the web ui
- private InfoServer infoServer;
-
- // Our zk client.
- private ZooKeeperWatcher zooKeeper;
// Manager and zk listener for master election
private ActiveMasterManager activeMasterManager;
// Region server tracker
@@ -293,51 +150,28 @@ MasterServices, Server {
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
- private LoadBalancerTracker loadBalancerTracker;
- // master address tracker
- private MasterAddressTracker masterAddressTracker;
-
- // RPC server for the HMaster
- private final RpcServerInterface rpcServer;
- private JvmPauseMonitor pauseMonitor;
- // Set after we've called HBaseServer#openServer and ready to receive RPCs.
- // Set back to false after we stop rpcServer. Used by tests.
- private volatile boolean rpcServerOpen = false;
+ LoadBalancerTracker loadBalancerTracker;
/** Namespace stuff */
private TableNamespaceManager tableNamespaceManager;
private NamespaceJanitor namespaceJanitorChore;
- /**
- * This servers address.
- */
- private final InetSocketAddress isa;
-
// Metrics for the HMaster
- private final MetricsMaster metricsMaster;
+ final MetricsMaster metricsMaster;
// file system manager for the master FS operations
private MasterFileSystem fileSystemManager;
// server manager to deal with region server info
- ServerManager serverManager;
+ volatile ServerManager serverManager;
// manager of assignment nodes in zookeeper
AssignmentManager assignmentManager;
- // manager of catalog regions
- private CatalogTracker catalogTracker;
- // Cluster status zk tracker and local setter
- private ClusterStatusTracker clusterStatusTracker;
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
- private MemoryBoundedLogMessageBuffer rsFatals;
+ MemoryBoundedLogMessageBuffer rsFatals;
- // This flag is for stopping this Master instance. Its set when we are
- // stopping or aborting
- private volatile boolean stopped = false;
- // Set on abort -- usually failure of our zk session.
- private volatile boolean abort = false;
// flag set after we become the active master (used for testing)
private volatile boolean isActiveMaster = false;
@@ -345,134 +179,65 @@ MasterServices, Server {
// it is not private since it's used in unit tests
volatile boolean initialized = false;
+ // flag set after master services are started,
+ // initialization may have not completed yet.
+ volatile boolean serviceStarted = false;
+
// flag set after we complete assignMeta.
private volatile boolean serverShutdownHandlerEnabled = false;
- // Instance of the hbase executor service.
- ExecutorService executorService;
-
- private LoadBalancer balancer;
- private Thread balancerChore;
- private Thread clusterStatusChore;
+ LoadBalancer balancer;
+ private BalancerChore balancerChore;
+ private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
- private CatalogJanitor catalogJanitorChore;
+ CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
- private MasterCoprocessorHost cpHost;
- private final ServerName serverName;
-
- private TableDescriptors tableDescriptors;
-
- // Table level lock manager for schema changes
- private TableLockManager tableLockManager;
+ MasterCoprocessorHost cpHost;
- // Time stamps for when a hmaster was started and when it became active
- private long masterStartTime;
+ // Time stamps for when a hmaster became active
private long masterActiveTime;
- /** time interval for emitting metrics values */
- private final int msgInterval;
- /**
- * MX Bean for MasterInfo
- */
- private ObjectName mxBean = null;
-
//should we check the compression codec type at master side, default true, HBASE-6370
private final boolean masterCheckCompression;
- private SpanReceiverHost spanReceiverHost;
-
- private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
+ Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
// monitor for snapshot of hbase tables
- private SnapshotManager snapshotManager;
+ SnapshotManager snapshotManager;
// monitor for distributed procedures
- private MasterProcedureManagerHost mpmHost;
-
- /** The health check chore. */
- private HealthCheckChore healthCheckChore;
-
- /**
- * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
- * assigned region servers instead of creating recovered.edits files.
- */
- private final boolean distributedLogReplay;
+ MasterProcedureManagerHost mpmHost;
/** flag used in test cases in order to simulate RS failures during master initialization */
private volatile boolean initializationBeforeMetaAssignment = false;
- /** The following is used in master recovery scenario to re-register listeners */
- private List<ZooKeeperListener> registeredZKListenersBeforeRecovery;
-
/**
* Initializes the HMaster. The steps are as follows:
* <p>
* <ol>
- * <li>Initialize HMaster RPC and address
- * <li>Connect to ZooKeeper.
+ * <li>Initialize the local HRegionServer
+ * <li>Start the ActiveMasterManager.
* </ol>
* <p>
- * Remaining steps of initialization occur in {@link #run()} so that they
- * run in their own thread rather than within the context of the constructor.
+ * Remaining steps of initialization occur in
+ * {@link #finishActiveMasterInitialization(MonitoredTask)} after
+ * the master becomes the active one.
+ *
* @throws InterruptedException
+ * @throws KeeperException
+ * @throws IOException
*/
public HMaster(final Configuration conf)
- throws IOException, KeeperException, InterruptedException {
- this.conf = new Configuration(conf);
- // Disable the block cache on the master
- this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- FSUtils.setupShortCircuitRead(conf);
- // Server to handle client requests.
- String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
- conf.get("hbase.master.dns.interface", "default"),
- conf.get("hbase.master.dns.nameserver", "default")));
- int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
- // Test that the hostname is reachable
- InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
- if (initialIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
- }
- // Verify that the bind address is reachable if set
- String bindAddress = conf.get("hbase.master.ipc.address");
- if (bindAddress != null) {
- initialIsa = new InetSocketAddress(bindAddress, port);
- if (initialIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
- }
- }
- String name = "master/" + initialIsa.toString();
- // Set how many times to retry talking to another server over HConnection.
- ConnectionUtils.setServerSideHConnectionRetriesConfig(this.conf, name, LOG);
- int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
- conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
- this.rpcServer = new RpcServer(this, name, getServices(),
- initialIsa, // BindAddress is IP we got for this server.
- conf,
- new FifoRpcScheduler(conf, numHandlers));
- // Set our address.
- this.isa = this.rpcServer.getListenerAddress();
- // We don't want to pass isa's hostname here since it could be 0.0.0.0
- this.serverName = ServerName.valueOf(hostname, this.isa.getPort(), System.currentTimeMillis());
+ throws IOException, KeeperException, InterruptedException {
+ super(conf);
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
- // login the zookeeper client principal (if using security)
- ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
- "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
-
- // initialize server principal (if using secure Hadoop)
- UserProvider provider = UserProvider.instantiate(conf);
- provider.login("hbase.master.keytab.file",
- "hbase.master.kerberos.principal", this.isa.getHostName());
-
LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
- // set the thread name now we have an address
- setName(MASTER + ":" + this.serverName.toShortString());
-
Replication.decorateMasterConfiguration(this.conf);
// Hack! Maps DFSClient => Master for logs. HDFS made this
@@ -481,26 +246,11 @@ MasterServices, Server {
this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
}
- this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
- this.rpcServer.startThreads();
- this.pauseMonitor = new JvmPauseMonitor(conf);
- this.pauseMonitor.start();
-
- // metrics interval: using the same property as region server.
- this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
-
//should we check the compression codec type at master side, default true, HBASE-6370
this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
- // Health checker thread.
- int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
- HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
- if (isHealthCheckerConfigured()) {
- healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
- }
-
// Do we publish the status?
boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
@@ -519,161 +269,60 @@ MasterServices, Server {
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
}
+ startActiveMasterManager();
+ }
- distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
- HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+ @VisibleForTesting
+ public MasterRpcServices getMasterRpcServices() {
+ return (MasterRpcServices)rpcServices;
}
- /**
- * @return list of blocking services and their security info classes that this server supports
- */
- private List<BlockingServiceAndInterface> getServices() {
- List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
- bssi.add(new BlockingServiceAndInterface(
- MasterProtos.MasterService.newReflectiveBlockingService(this),
- MasterProtos.MasterService.BlockingInterface.class));
- bssi.add(new BlockingServiceAndInterface(
- RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
- RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
- return bssi;
+ public boolean balanceSwitch(final boolean b) throws IOException {
+ return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
}
- /**
- * Stall startup if we are designated a backup master; i.e. we want someone
- * else to become the master before proceeding.
- * @param c configuration
- * @param amm
- * @throws InterruptedException
- */
- private static void stallIfBackupMaster(final Configuration c,
- final ActiveMasterManager amm)
- throws InterruptedException {
- // If we're a backup master, stall until a primary to writes his address
- if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
- HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
- return;
- }
- LOG.debug("HMaster started in backup mode. " +
- "Stalling until master znode is written.");
- // This will only be a minute or so while the cluster starts up,
- // so don't worry about setting watches on the parent znode
- while (!amm.isActiveMaster()) {
- LOG.debug("Waiting for master address ZNode to be written " +
- "(Also watching cluster state node)");
- Thread.sleep(
- c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
- }
+ protected String getProcessName() {
+ return MASTER;
+ }
+ protected boolean canCreateBaseZNode() {
+ return true;
}
- MetricsMaster getMetrics() {
- return metricsMaster;
+ protected boolean canUpdateTableDescriptor() {
+ return true;
}
- /**
- * Main processing loop for the HMaster.
- * <ol>
- * <li>Block until becoming active master
- * <li>Finish initialization via finishInitialization(MonitoredTask)
- * <li>Enter loop until we are stopped
- * <li>Stop services and perform cleanup once stopped
- * </ol>
- */
- @Override
- public void run() {
- MonitoredTask startupStatus =
- TaskMonitor.get().createStatus("Master startup");
- startupStatus.setDescription("Master startup");
- masterStartTime = System.currentTimeMillis();
- try {
- this.masterAddressTracker = new MasterAddressTracker(getZooKeeperWatcher(), this);
- this.masterAddressTracker.start();
+ protected RSRpcServices createRpcServices() throws IOException {
+ return new MasterRpcServices(this);
+ }
- // Put up info server.
- int port = this.conf.getInt("hbase.master.info.port", HConstants.DEFAULT_MASTER_INFOPORT);
- if (port >= 0) {
- String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
- this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
- this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
- this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
- this.infoServer.setAttribute(MASTER, this);
- this.infoServer.start();
- }
-
- this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners();
- /*
- * Block on becoming the active master.
- *
- * We race with other masters to write our address into ZooKeeper. If we
- * succeed, we are the primary/active master and finish initialization.
- *
- * If we do not succeed, there is another active master and we should
- * now wait until it dies to try and become the next active master. If we
- * do not succeed on our first attempt, this is no longer a cluster startup.
- */
- becomeActiveMaster(startupStatus);
-
- // We are either the active master or we were asked to shutdown
- if (!this.stopped) {
- finishInitialization(startupStatus, false);
- loop();
- }
- } catch (Throwable t) {
- // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
- if (t instanceof NoClassDefFoundError &&
- t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
- // improved error message for this special case
- abort("HBase is having a problem with its Hadoop jars. You may need to "
- + "recompile HBase against Hadoop version "
- + org.apache.hadoop.util.VersionInfo.getVersion()
- + " or change your hadoop jars to start properly", t);
- } else {
- abort("Unhandled exception. Starting shutdown.", t);
- }
- } finally {
- startupStatus.cleanup();
+ protected void configureInfoServer() {
+ infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
+ infoServer.setAttribute(MASTER, this);
+ super.configureInfoServer();
+ }
- stopChores();
- // Wait for all the remaining region servers to report in IFF we were
- // running a cluster shutdown AND we were NOT aborting.
- if (!this.abort && this.serverManager != null &&
- this.serverManager.isClusterShutdown()) {
- this.serverManager.letRegionServersShutdown();
- }
- stopServiceThreads();
- // Stop services started for both backup and active masters
- if (this.activeMasterManager != null) this.activeMasterManager.stop();
- if (this.catalogTracker != null) this.catalogTracker.stop();
- if (this.serverManager != null) this.serverManager.stop();
- if (this.assignmentManager != null) this.assignmentManager.stop();
- if (this.fileSystemManager != null) this.fileSystemManager.stop();
- if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
- this.zooKeeper.close();
- }
- LOG.info("HMaster main thread exiting");
+ protected Class<? extends HttpServlet> getDumpServlet() {
+ return MasterDumpServlet.class;
}
/**
- * Try becoming active master.
- * @param startupStatus
- * @return True if we could successfully become the active master.
- * @throws InterruptedException
+ * Emit the HMaster metrics, such as region in transition metrics.
+ * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
*/
- private boolean becomeActiveMaster(MonitoredTask startupStatus)
- throws InterruptedException {
- // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
- // if we come back to life.
- this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
- this);
- this.zooKeeper.registerListener(activeMasterManager);
- stallIfBackupMaster(this.conf, this.activeMasterManager);
-
- // The ClusterStatusTracker is setup before the other
- // ZKBasedSystemTrackers because it's needed by the activeMasterManager
- // to check if the cluster should be shutdown.
- this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
- this.clusterStatusTracker.start();
- return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
+ protected void doMetrics() {
+ try {
+ if (assignmentManager != null) {
+ assignmentManager.updateRegionsInTransitionMetrics();
+ }
+ } catch (Throwable e) {
+ LOG.error("Couldn't update metrics: " + e.getMessage());
+ }
+ }
+
+ MetricsMaster getMasterMetrics() {
+ return metricsMaster;
}
/**
@@ -683,14 +332,11 @@ MasterServices, Server {
*/
void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
- this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
- this.catalogTracker.start();
-
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager,
- this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
+ this.catalogTracker, this.balancer, this.service, this.metricsMaster,
this.tableLockManager);
zooKeeper.registerListenerFirst(assignmentManager);
@@ -721,74 +367,26 @@ MasterServices, Server {
}
/**
- * Create CatalogTracker.
- * In its own method so can intercept and mock it over in tests.
- * @param zk If zk is null, we'll create an instance (and shut it down
- * when {@link #stop(String)} is called) else we'll use what is passed.
- * @param conf
- * @param abortable If fatal exception we'll call abort on this. May be null.
- * If it is we'll use the Connection associated with the passed
- * {@link Configuration} as our {@link Abortable}.
- * ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
- * @throws IOException
- */
- CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
- final Configuration conf, Abortable abortable)
- throws IOException {
- return new CatalogTracker(zk, conf, abortable);
- }
-
- // Check if we should stop every 100ms
- private Sleeper stopSleeper = new Sleeper(100, this);
-
- private void loop() {
- long lastMsgTs = 0l;
- long now = 0l;
- while (!this.stopped) {
- now = System.currentTimeMillis();
- if ((now - lastMsgTs) >= this.msgInterval) {
- doMetrics();
- lastMsgTs = System.currentTimeMillis();
- }
- stopSleeper.sleep();
- }
- }
-
- /**
- * Emit the HMaster metrics, such as region in transition metrics.
- * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
- */
- private void doMetrics() {
- try {
- this.assignmentManager.updateRegionsInTransitionMetrics();
- } catch (Throwable e) {
- LOG.error("Couldn't update metrics: " + e.getMessage());
- }
- }
-
- /**
* Finish initialization of HMaster after becoming the primary master.
*
* <ol>
* <li>Initialize master components - file system manager, server manager,
- * assignment manager, region server tracker, catalog tracker, etc</li>
- * <li>Start necessary service threads - rpc server, info server,
+ * assignment manager, region server tracker, etc</li>
+ * <li>Start necessary service threads - balancer, catalog janior,
* executor services, etc</li>
* <li>Set cluster as UP in ZooKeeper</li>
* <li>Wait for RegionServers to check-in</li>
* <li>Split logs and perform data recovery, if necessary</li>
- * <li>Ensure assignment of meta regions<li>
+ * <li>Ensure assignment of meta/namespace regions<li>
* <li>Handle either fresh cluster start or master failover</li>
* </ol>
*
- * @param masterRecovery
- *
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
- private void finishInitialization(MonitoredTask status, boolean masterRecovery)
- throws IOException, InterruptedException, KeeperException {
+ private void finishActiveMasterInitialization(MonitoredTask status)
+ throws IOException, InterruptedException, KeeperException {
isActiveMaster = true;
@@ -802,44 +400,31 @@ MasterServices, Server {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
- this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);
-
- this.tableDescriptors =
- new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
- this.fileSystemManager.getRootDir());
+ this.fileSystemManager = new MasterFileSystem(this, this);
// publish cluster ID
status.setStatus("Publishing Cluster ID in ZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
+ this.serverManager = createServerManager(this, this);
- if (!masterRecovery) {
- this.executorService = new ExecutorService(getServerName().toShortString());
- this.serverManager = createServerManager(this, this);
- }
-
- //Initialize table lock manager, and ensure that all write locks held previously
- //are invalidated
- this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
- if (!masterRecovery) {
- this.tableLockManager.reapWriteLocks();
- }
+ // Invalidate all write locks held previously
+ this.tableLockManager.reapWriteLocks();
status.setStatus("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
- if (!masterRecovery) {
- // initialize master side coprocessors before we start handling requests
- status.setStatus("Initializing master coprocessors");
- this.cpHost = new MasterCoprocessorHost(this, this.conf);
+ // initialize master side coprocessors before we start handling requests
+ status.setStatus("Initializing master coprocessors");
+ this.cpHost = new MasterCoprocessorHost(this, this.conf);
+
+ // start up all service threads.
+ status.setStatus("Initializing master service threads");
+ startServiceThreads();
- spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
+ // Wake up this server to check in
+ sleeper.skipSleepCycle();
- // start up all service threads.
- status.setStatus("Initializing master service threads");
- startServiceThreads();
- }
-
- // Wait for region servers to report in.
+ // Wait for region servers to report in
this.serverManager.waitForRegionServers(status);
// Check zk for region servers that are up but didn't register
for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
@@ -850,9 +435,7 @@ MasterServices, Server {
}
}
- if (!masterRecovery) {
- this.assignmentManager.startTimeOutMonitor();
- }
+ this.assignmentManager.startTimeOutMonitor();
// get a list for previously failed RS which need log splitting work
// we recover hbase:meta region servers inside master initialization and
@@ -888,12 +471,19 @@ MasterServices, Server {
this.balancer.setMasterServices(this);
this.balancer.initialize();
+ // Wait for regionserver to finish initialization.
+ while (!isOnline()) {
+ synchronized (online) {
+ online.wait(100);
+ }
+ }
+
// Make sure meta assigned before proceeding.
status.setStatus("Assigning Meta Region");
assignMeta(status, previouslyFailedMetaRSs);
// check if master is shutting down because above assignMeta could return even hbase:meta isn't
// assigned when master is shutting down
- if(this.stopped) return;
+ if(isStopped()) return;
status.setStatus("Submitting log splitting work for previously failed region servers");
// Master has recovered hbase:meta region server and we put
@@ -915,15 +505,15 @@ MasterServices, Server {
//set cluster status again after user regions are assigned
this.balancer.setClusterStatus(getClusterStatus());
- if (!masterRecovery) {
- // Start balancer and meta catalog janitor after meta and regions have
- // been assigned.
- status.setStatus("Starting balancer and catalog janitor");
- this.clusterStatusChore = getAndStartClusterStatusChore(this);
- this.balancerChore = getAndStartBalancerChore(this);
- this.catalogJanitorChore = new CatalogJanitor(this, this);
- startCatalogJanitorChore();
- }
+ // Start balancer and meta catalog janitor after meta and regions have
+ // been assigned.
+ status.setStatus("Starting balancer and catalog janitor");
+ this.clusterStatusChore = new ClusterStatusChore(this, balancer);
+ Threads.setDaemonThreadRunning(clusterStatusChore.getThread());
+ this.balancerChore = new BalancerChore(this);
+ Threads.setDaemonThreadRunning(balancerChore.getThread());
+ this.catalogJanitorChore = new CatalogJanitor(this, this);
+ Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
status.setStatus("Starting namespace manager");
initNamespace();
@@ -944,14 +534,12 @@ MasterServices, Server {
// master initialization. See HBASE-5916.
this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
- if (!masterRecovery) {
- if (this.cpHost != null) {
- // don't let cp initialization errors kill the master
- try {
- this.cpHost.postStartMaster();
- } catch (IOException ioe) {
- LOG.error("Coprocessor postStartMaster() hook failed", ioe);
- }
+ if (this.cpHost != null) {
+ // don't let cp initialization errors kill the master
+ try {
+ this.cpHost.postStartMaster();
+ } catch (IOException ioe) {
+ LOG.error("Coprocessor postStartMaster() hook failed", ioe);
}
}
}
@@ -1127,47 +715,16 @@ MasterServices, Server {
return this.tableDescriptors;
}
- /** @return InfoServer object. Maybe null.*/
- public InfoServer getInfoServer() {
- return this.infoServer;
- }
-
- @Override
- public Configuration getConfiguration() {
- return this.conf;
- }
-
@Override
public ServerManager getServerManager() {
return this.serverManager;
}
@Override
- public ExecutorService getExecutorService() {
- return this.executorService;
- }
-
- @Override
public MasterFileSystem getMasterFileSystem() {
return this.fileSystemManager;
}
- /**
- * Get the ZK wrapper object - needed by master_jsp.java
- * @return the zookeeper wrapper
- */
- public ZooKeeperWatcher getZooKeeperWatcher() {
- return this.zooKeeper;
- }
-
- public ActiveMasterManager getActiveMasterManager() {
- return this.activeMasterManager;
- }
-
- public MasterAddressTracker getMasterAddressTracker() {
- return this.masterAddressTracker;
- }
-
/*
* Start up all services. If any of these threads gets an unhandled exception
* then they just die with a logged message. This should be fine because
@@ -1175,99 +732,65 @@ MasterServices, Server {
* as OOMEs; it should be lightly loaded. See what HRegionServer does if
* need to install an unexpected exception handler.
*/
- void startServiceThreads() throws IOException{
+ private void startServiceThreads() throws IOException{
// Start the executor service pools
- this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
+ this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
conf.getInt("hbase.master.executor.openregion.threads", 5));
- this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
+ this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
- this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
+ this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
- this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
+ this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
- this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
+ this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
// tables.
- this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+ this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
// Start log cleaner thread
- String n = Thread.currentThread().getName();
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
- Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
+ Threads.setDaemonThreadRunning(logCleaner.getThread(), getName() + ".oldLogCleaner");
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir);
- Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
-
- // Start the health checker
- if (this.healthCheckChore != null) {
- Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
- }
+ Threads.setDaemonThreadRunning(hfileCleaner.getThread(),
+ getName() + ".archivedHFileCleaner");
- // Start allowing requests to happen.
- this.rpcServer.openServer();
- this.rpcServerOpen = true;
+ serviceStarted = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
}
- /**
- * Use this when trying to figure when its ok to send in rpcs. Used by tests.
- * @return True if we have successfully run {@link RpcServer#openServer()}
- */
- boolean isRpcServerOpen() {
- return this.rpcServerOpen;
- }
-
- private void stopServiceThreads() {
+ protected void stopServiceThreads() {
+ super.stopServiceThreads();
+ stopChores();
+ // Wait for all the remaining region servers to report in IFF we were
+ // running a cluster shutdown AND we were NOT aborting.
+ if (!isAborted() && this.serverManager != null &&
+ this.serverManager.isClusterShutdown()) {
+ this.serverManager.letRegionServersShutdown();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping service threads");
}
- if (this.rpcServer != null) this.rpcServer.stop();
- this.rpcServerOpen = false;
// Clean up and close up shop
if (this.logCleaner!= null) this.logCleaner.interrupt();
if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
-
- if (this.infoServer != null) {
- LOG.info("Stopping infoServer");
- try {
- this.infoServer.stop();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (this.executorService != null) this.executorService.shutdown();
- if (this.healthCheckChore != null) {
- this.healthCheckChore.interrupt();
- }
- if (this.pauseMonitor != null) {
- this.pauseMonitor.stop();
- }
- }
-
- private static Thread getAndStartClusterStatusChore(HMaster master) {
- if (master == null || master.balancer == null) {
- return null;
- }
- Chore chore = new ClusterStatusChore(master, master.balancer);
- return Threads.setDaemonThreadRunning(chore.getThread());
- }
-
- private static Thread getAndStartBalancerChore(final HMaster master) {
- // Start up the load balancer chore
- Chore chore = new BalancerChore(master);
- return Threads.setDaemonThreadRunning(chore.getThread());
+ if (this.activeMasterManager != null) this.activeMasterManager.stop();
+ if (this.serverManager != null) this.serverManager.stop();
+ if (this.assignmentManager != null) this.assignmentManager.stop();
+ if (this.fileSystemManager != null) this.fileSystemManager.stop();
+ if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
}
private void stopChores() {
@@ -1288,131 +811,25 @@ MasterServices, Server {
}
}
- @Override
- public RegionServerStartupResponse regionServerStartup(
- RpcController controller, RegionServerStartupRequest request) throws ServiceException {
- // Register with server manager
- try {
- InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
- ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
- request.getServerStartCode(), request.getServerCurrentTime());
-
- // Send back some config info
- RegionServerStartupResponse.Builder resp = createConfigurationSubset();
- NameStringPair.Builder entry = NameStringPair.newBuilder()
- .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
- .setValue(rs.getHostname());
- resp.addMapEntries(entry.build());
-
- return resp.build();
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
/**
* @return Get remote side's InetAddress
* @throws UnknownHostException
*/
- InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
- throws UnknownHostException {
+ InetAddress getRemoteInetAddress(final int port,
+ final long serverStartCode) throws UnknownHostException {
// Do it out here in its own little method so can fake an address when
// mocking up in tests.
- return RpcServer.getRemoteIp();
- }
-
- /**
- * @return Subset of configuration to pass initializing regionservers: e.g.
- * the filesystem to use and root directory to use.
- */
- protected RegionServerStartupResponse.Builder createConfigurationSubset() {
- RegionServerStartupResponse.Builder resp = addConfig(
- RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
- return addConfig(resp, "fs.default.name");
- }
-
- private RegionServerStartupResponse.Builder addConfig(
- final RegionServerStartupResponse.Builder resp, final String key) {
- NameStringPair.Builder entry = NameStringPair.newBuilder()
- .setName(key)
- .setValue(this.conf.get(key));
- resp.addMapEntries(entry.build());
- return resp;
- }
-
- @Override
- public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
- GetLastFlushedSequenceIdRequest request) throws ServiceException {
- byte[] regionName = request.getRegionName().toByteArray();
- long seqId = serverManager.getLastFlushedSequenceId(regionName);
- return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
- }
+ InetAddress ia = RpcServer.getRemoteIp();
- @Override
- public RegionServerReportResponse regionServerReport(
- RpcController controller, RegionServerReportRequest request) throws ServiceException {
- try {
- ClusterStatusProtos.ServerLoad sl = request.getLoad();
- ServerName serverName = ProtobufUtil.toServerName(request.getServer());
- ServerLoad oldLoad = serverManager.getLoad(serverName);
- this.serverManager.regionServerReport(serverName, new ServerLoad(sl));
- if (sl != null && this.metricsMaster != null) {
- // Up our metrics.
- this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
- - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
+ // The call could be from the local regionserver,
+ // in which case, there is no remote address.
+ if (ia == null && serverStartCode == startcode) {
+ InetSocketAddress isa = rpcServices.getSocketAddress();
+ if (isa != null && isa.getPort() == port) {
+ ia = isa.getAddress();
}
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
-
- return RegionServerReportResponse.newBuilder().build();
- }
-
- @Override
- public ReportRSFatalErrorResponse reportRSFatalError(
- RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
- String errorText = request.getErrorMessage();
- ServerName sn = ProtobufUtil.toServerName(request.getServer());
- String msg = "Region server " + sn +
- " reported a fatal error:\n" + errorText;
- LOG.error(msg);
- rsFatals.add(msg);
-
- return ReportRSFatalErrorResponse.newBuilder().build();
- }
-
- public boolean isMasterRunning() {
- return !isStopped();
- }
-
- @Override
- public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
- throws ServiceException {
- return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
- }
-
- @Override
- public RunCatalogScanResponse runCatalogScan(RpcController c,
- RunCatalogScanRequest req) throws ServiceException {
- try {
- return ResponseConverter.buildRunCatalogScanResponse(catalogJanitorChore.scan());
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
}
- }
-
- @Override
- public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
- EnableCatalogJanitorRequest req) throws ServiceException {
- return EnableCatalogJanitorResponse.newBuilder().
- setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
- }
-
- @Override
- public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
- IsCatalogJanitorEnabledRequest req) throws ServiceException {
- boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
- return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
+ return ia;
}
/**
@@ -1515,54 +932,6 @@ MasterServices, Server {
return balancerRan;
}
- @Override
- public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
- try {
- return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
- } catch (IOException ex) {
- throw new ServiceException(ex);
- }
- }
-
- enum BalanceSwitchMode {
- SYNC,
- ASYNC
- }
-
- /**
- * Assigns balancer switch according to BalanceSwitchMode
- * @param b new balancer switch
- * @param mode BalanceSwitchMode
- * @return old balancer switch
- */
- public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
- boolean oldValue = this.loadBalancerTracker.isBalancerOn();
- boolean newValue = b;
- try {
- if (this.cpHost != null) {
- newValue = this.cpHost.preBalanceSwitch(newValue);
- }
- try {
- if (mode == BalanceSwitchMode.SYNC) {
- synchronized (this.balancer) {
- this.loadBalancerTracker.setBalancerOn(newValue);
- }
- } else {
- this.loadBalancerTracker.setBalancerOn(newValue);
- }
- } catch (KeeperException ke) {
- throw new IOException(ke);
- }
- LOG.info(getClientIdAuditPrefix() + " set balanceSwitch=" + newValue);
- if (this.cpHost != null) {
- this.cpHost.postBalanceSwitch(oldValue, newValue);
- }
- } catch (IOException ioe) {
- LOG.warn("Error flipping balance switch", ioe);
- }
- return oldValue;
- }
-
/**
* @return Client info for use as prefix on an audit log string; who did an action
*/
@@ -1571,26 +940,6 @@ MasterServices, Server {
RequestContext.get().getRemoteAddress();
}
- public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
- return switchBalancer(b, BalanceSwitchMode.SYNC);
- }
-
- public boolean balanceSwitch(final boolean b) throws IOException {
- return switchBalancer(b, BalanceSwitchMode.ASYNC);
- }
-
- @Override
- public SetBalancerRunningResponse setBalancerRunning(
- RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
- try {
- boolean prevValue = (req.getSynchronous())?
- synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
- return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
/**
* Switch for the background CatalogJanitor thread.
* Used for testing. The thread will continue to run. It will just be a noop
@@ -1602,90 +951,13 @@ MasterServices, Server {
}
@Override
- public DispatchMergingRegionsResponse dispatchMergingRegions(
- RpcController controller, DispatchMergingRegionsRequest request)
- throws ServiceException {
- final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
- .toByteArray();
- final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
- .toByteArray();
- final boolean forcible = request.getForcible();
- if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
- || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
- LOG.warn("mergeRegions specifier type: expected: "
- + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
- + request.getRegionA().getType() + ", region_b="
- + request.getRegionB().getType());
- }
- RegionState regionStateA = assignmentManager.getRegionStates()
- .getRegionState(Bytes.toString(encodedNameOfRegionA));
- RegionState regionStateB = assignmentManager.getRegionStates()
- .getRegionState(Bytes.toString(encodedNameOfRegionB));
- if (regionStateA == null || regionStateB == null) {
- throw new ServiceException(new UnknownRegionException(
- Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
- : encodedNameOfRegionB)));
- }
-
- if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
- throw new ServiceException(new MergeRegionException(
- "Unable to merge regions not online " + regionStateA + ", " + regionStateB));
- }
-
- HRegionInfo regionInfoA = regionStateA.getRegion();
- HRegionInfo regionInfoB = regionStateB.getRegion();
- if (regionInfoA.compareTo(regionInfoB) == 0) {
- throw new ServiceException(new MergeRegionException(
- "Unable to merge a region to itself " + regionInfoA + ", " + regionInfoB));
- }
-
- if (!forcible && !HRegionInfo.areAdjacent(regionInfoA, regionInfoB)) {
- throw new ServiceException(new MergeRegionException(
- "Unable to merge not adjacent regions "
- + regionInfoA.getRegionNameAsString() + ", "
- + regionInfoB.getRegionNameAsString()
- + " where forcible = " + forcible));
- }
-
- try {
- dispatchMergingRegions(regionInfoA, regionInfoB, forcible);
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
-
- return DispatchMergingRegionsResponse.newBuilder().build();
- }
-
- @Override
public void dispatchMergingRegions(final HRegionInfo region_a,
final HRegionInfo region_b, final boolean forcible) throws IOException {
checkInitialized();
- this.executorService.submit(new DispatchMergingRegionHandler(this,
+ this.service.submit(new DispatchMergingRegionHandler(this,
this.catalogJanitorChore, region_a, region_b, forcible));
}
- @Override
- public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
- throws ServiceException {
- final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
- RegionSpecifierType type = req.getRegion().getType();
- final byte [] destServerName = (req.hasDestServerName())?
- Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
- MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
-
- if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
- LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
- + " actual: " + type);
- }
-
- try {
- move(encodedRegionName, destServerName);
- } catch (HBaseIOException ioe) {
- throw new ServiceException(ioe);
- }
- return mrr;
- }
-
void move(final byte[] encodedRegionName,
final byte[] destServerName) throws HBaseIOException {
RegionState regionState = assignmentManager.getRegionStates().
@@ -1736,9 +1008,8 @@ MasterServices, Server {
@Override
public void createTable(HTableDescriptor hTableDescriptor,
- byte [][] splitKeys)
- throws IOException {
- if (!isMasterRunning()) {
+ byte [][] splitKeys) throws IOException {
+ if (isStopped()) {
throw new MasterNotRunningException();
}
@@ -1752,7 +1023,7 @@ MasterServices, Server {
cpHost.preCreateTable(hTableDescriptor, newRegions);
}
LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
- this.executorService.submit(new CreateTableHandler(this,
+ this.service.submit(new CreateTableHandler(this,
this.fileSystemManager, hTableDescriptor, conf,
newRegions, this).prepare());
if (cpHost != null) {
@@ -1852,6 +1123,53 @@ MasterServices, Server {
}
}
+ private void startActiveMasterManager() {
+ activeMasterManager = new ActiveMasterManager(zooKeeper, serverName, this);
+ // Start a thread to try to become the active master, so we won't block here
+ Threads.setDaemonThreadRunning(new Thread(new Runnable() {
+ public void run() {
+ int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
+ HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+ // If we're a backup master, stall until a primary to writes his address
+ if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
+ HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
+ LOG.debug("HMaster started in backup mode. "
+ + "Stalling until master znode is written.");
+ // This will only be a minute or so while the cluster starts up,
+ // so don't worry about setting watches on the parent znode
+ while (!activeMasterManager.hasActiveMaster()) {
+ LOG.debug("Waiting for master address ZNode to be written "
+ + "(Also watching cluster state node)");
+ Threads.sleep(timeout);
+ }
+ }
+ MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
+ status.setDescription("Master startup");
+ try {
+ if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
+ finishActiveMasterInitialization(status);
+ }
+ } catch (Throwable t) {
+ status.setStatus("Failed to become active: " + t.getMessage());
+ LOG.fatal("Failed to become active master", t);
+ // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
+ if (t instanceof NoClassDefFoundError &&
+ t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
+ // improved error message for this special case
+ abort("HBase is having a problem with its Hadoop jars. You may need to "
+ + "recompile HBase against Hadoop version "
+ + org.apache.hadoop.util.VersionInfo.getVersion()
+ + " or change your hadoop jars to start properly", t);
+ } else {
+ abort("Unhandled exception. Starting shutdown.", t);
+ }
+ } finally {
+ status.cleanup();
+ }
+ }
+ }, "ActiveMasterManager"));
+ }
+
private void checkCompression(final HTableDescriptor htd)
throws IOException {
if (!this.masterCheckCompression) return;
@@ -1867,19 +1185,6 @@ MasterServices, Server {
CompressionTest.testCompression(hcd.getCompactionCompression());
}
- @Override
- public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
- throws ServiceException {
- HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
- byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
- try {
- createTable(hTableDescriptor,splitKeys);
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return CreateTableResponse.newBuilder().build();
- }
-
private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
byte[][] splitKeys) {
HRegionInfo[] hRegionInfos = null;
@@ -1912,52 +1217,13 @@ MasterServices, Server {
cpHost.preDeleteTable(tableName);
}
LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
- this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
+ this.service.submit(new DeleteTableHandler(tableName, this, this).prepare());
if (cpHost != null) {
cpHost.postDeleteTable(tableName);
}
}
@Override
- public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
- throws ServiceException {
- try {
- deleteTable(ProtobufUtil.toTableName(request.getTableName()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return DeleteTableResponse.newBuilder().build();
- }
-
- /**
- * Get the number of regions of the table that have been updated by the alter.
- *
- * @return Pair indicating the number of regions updated Pair.getFirst is the
- * regions that are yet to be updated Pair.getSecond is the total number
- * of regions of the table
- * @throws IOException
- */
- @Override
- public GetSchemaAlterStatusResponse getSchemaAlterStatus(
- RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
- // TODO: currently, we query using the table name on the client side. this
- // may overlap with other table operations or the table operation may
- // have completed before querying this API. We need to refactor to a
- // transaction system in the future to avoid these ambiguities.
- TableName tableName = ProtobufUtil.toTableName(req.getTableName());
-
- try {
- Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
- GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
- ret.setYetToUpdateRegions(pair.getFirst());
- ret.setTotalRegions(pair.getSecond());
- return ret.build();
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
- @Override
public void addColumn(final TableName tableName, final HColumnDescriptor column)
throws IOException {
checkInitialized();
@@ -1974,18 +1240,6 @@ MasterServices, Server {
}
@Override
- public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
- throws ServiceException {
- try {
- addColumn(ProtobufUtil.toTableName(req.getTableName()),
- HColumnDescriptor.convert(req.getColumnFamilies()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return AddColumnResponse.newBuilder().build();
- }
-
- @Override
public void modifyColumn(TableName tableName, HColumnDescriptor descriptor)
throws IOException {
checkInitialized();
@@ -2004,18 +1258,6 @@ MasterServices, Server {
}
@Override
- public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
- throws ServiceException {
- try {
- modifyColumn(ProtobufUtil.toTableName(req.getTableName()),
- HColumnDescriptor.convert(req.getColumnFamilies()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return ModifyColumnResponse.newBuilder().build();
- }
-
- @Override
public void deleteColumn(final TableName tableName, final byte[] columnName)
throws IOException {
checkInitialized();
@@ -2032,25 +1274,13 @@ MasterServices, Server {
}
@Override
- public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
- throws ServiceException {
- try {
- deleteColumn(ProtobufUtil.toTableName(req.getTableName()),
- req.getColumnName().toByteArray());
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return DeleteColumnResponse.newBuilder().build();
- }
-
- @Override
public void enableTable(final TableName tableName) throws IOException {
checkInitialized();
if (cpHost != null) {
cpHost.preEnableTable(tableName);
}
LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
- this.executorService.submit(new EnableTableHandler(this, tableName,
+ this.service.submit(new EnableTableHandler(this, tableName,
catalogTracker, assignmentManager, tableLockManager, false).prepare());
if (cpHost != null) {
cpHost.postEnableTable(tableName);
@@ -2058,41 +1288,19 @@ MasterServices, Server {
}
@Override
- public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
- throws ServiceException {
- try {
- enableTable(ProtobufUtil.toTableName(request.getTableName()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return EnableTableResponse.newBuilder().build();
- }
-
- @Override
public void disableTable(final TableName tableName) throws IOException {
checkInitialized();
if (cpHost != null) {
cpHost.preDisableTable(tableName);
}
LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
- this.executorService.submit(new DisableTableHandler(this, tableName,
+ this.service.submit(new DisableTableHandler(this, tableName,
catalogTracker, assignmentManager, tableLockManager, false).prepare());
if (cpHost != null) {
cpHost.postDisableTable(tableName);
}
}
- @Override
- public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
- throws ServiceException {
- try {
- disableTable(ProtobufUtil.toTableName(request.getTableName()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return DisableTableResponse.newBuilder().build();
- }
-
/**
* Return the region and current deployment for the region containing
* the given row. If the region cannot be found, returns null. If it
@@ -2144,18 +1352,6 @@ MasterServices, Server {
}
@Override
- public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
- throws ServiceException {
- try {
- modifyTable(ProtobufUtil.toTableName(req.getTableName()),
- HTableDescriptor.convert(req.getTableSchema()));
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- return ModifyTableResponse.newBuilder().build();
- }
-
- @Override
public void checkTableModifiable(final TableName tableName)
throws IOException, TableNotFoundException, TableNotDisabledException {
if (isCatalogTable(tableName)) {
@@ -2170,19 +1366,6 @@ MasterServices, Server {
}
}
- @Override
- public GetClusterStatusResponse getClusterStatus(RpcController controller,
- GetClusterStatusRequest req)
- throws ServiceException {
- GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
- try {
- response.setClusterStatus(getClusterStatus().convert());
- } catch (InterruptedIOException e) {
- throw new ServiceException(e);
- }
- return response.build();
- }
-
/**
* @return cluster status
*/
@@ -2235,18 +1418,7 @@ MasterServices, Server {
this.serverName,
backupMasters,
this.assignmentManager.getRegionStates().getRegionsInTransition(),
- this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
- }
-
- public String getClusterId() {
- if (fileSystemManager == null) {
- return "";
- }
- ClusterId id = fileSystemManager.getClusterId();
- if (id == null) {
- return "";
- }
- return id.toString();
+ this.getMasterCoprocessors(), this.loadBalancerTracker.isBalancerOn());
}
/**
@@ -2264,7 +1436,7 @@ MasterServices, Server {
* @return timestamp in millis when HMaster was started.
*/
public long getMasterStartTime() {
- return masterStartTime;
+ return startcode;
}
/**
@@ -2286,9 +1458,8 @@ MasterServices, Server {
/**
* @return array of coprocessor SimpleNames.
*/
- public String[] getCoprocessors() {
- Set<String> masterCoprocessors =
- getCoprocessorHost().getCoprocessors();
+ public String[] getMasterCoprocessors() {
+ Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
}
@@ -2299,110 +1470,8 @@ MasterServices, Server {
LOG.fatal("Master server abort: loaded coprocessors are: " +
getLoadedCoprocessors());
}
-
- if (abortNow(msg, t)) {
- if (t != null) LOG.fatal(msg, t);
- else LOG.fatal(msg);
- this.abort = true;
- stop("Aborting");
- }
- }
-
- /**
- * We do the following in a different thread. If it is not completed
- * in time, we will time it out and assume it is not easy to recover.
- *
- * 1. Create a new ZK session. (since our current one is expired)
- * 2. Try to become a primary master again
- * 3. Initialize all ZK based system trackers.
- * 4. Assign meta. (they are already assigned, but we need to update our
- * internal memory state to reflect it)
- * 5. Process any RIT if any during the process of our recovery.
- *
- * @return True if we could successfully recover from ZK session expiry.
- * @throws InterruptedException
- * @throws IOException
- * @throws KeeperException
- * @throws ExecutionException
- */
- private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
- IOException, KeeperException, ExecutionException {
-
- this.zooKeeper.unregisterAllListeners();
- // add back listeners which were registered before master initialization
- // because they won't be added back in below Master re-initialization code
- if (this.registeredZKListenersBeforeRecovery != null) {
- for (ZooKeeperListener curListener : this.registeredZKListenersBeforeRecovery) {
- this.zooKeeper.registerListener(curListener);
- }
- }
-
- this.zooKeeper.reconnectAfterExpiration();
-
- Callable<Boolean> callable = new Callable<Boolean> () {
- @Override
- public Boolean call() throws InterruptedException,
- IOException, KeeperException {
- MonitoredTask status =
- TaskMonitor.get().createStatus("Recovering expired ZK session");
- try {
- if (!becomeActiveMaster(status)) {
- return Boolean.FALSE;
- }
- serverShutdownHandlerEnabled = false;
- initialized = false;
- finishInitialization(status, true);
- return !stopped;
- } finally {
- status.cleanup();
- }
- }
- };
-
- long timeout =
- conf.getLong("hbase.master.zksession.recover.timeout", 300000);
- java.util.concurrent.ExecutorService executor =
- Executors.newSingleThreadExecutor();
- Future<Boolean> result = executor.submit(callable);
- executor.shutdown();
- if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
- && result.isDone()) {
- Boolean recovered = result.get();
- if (recovered != null) {
- return recovered.booleanValue();
- }
- }
- executor.shutdownNow();
- return false;
- }
-
- /**
- * Check to see if the current trigger for abort is due to ZooKeeper session
- * expiry, and If yes, whether we can recover from ZK session expiry.
- *
- * @param msg Original abort message
- * @param t The cause for current abort request
- * @return true if we should proceed with abort operation, false other wise.
- */
- private boolean abortNow(final String msg, final Throwable t) {
- if (!this.isActiveMaster || this.stopped) {
- return true;
- }
-
- boolean failFast = conf.getBoolean("fail.fast.expired.active.master", false);
- if (t != null && t instanceof KeeperException.SessionExpiredException
- && !failFast) {
- try {
- LOG.info("Primary Master trying to recover from ZooKeeper session " +
- "expiry.");
- return !tryRecoveringExpiredZKSession();
- } catch (Throwable newT) {
- LOG.error("Primary master encountered unexpected exception while " +
- "trying to recover from ZooKeeper session" +
- " expiry. Proceeding with server abort.", newT);
- }
- }
- return true;
+ if (t != null) LOG.fatal(msg, t);
+ stop(msg);
}
@Override
@@ -2411,7 +1480,7 @@ MasterServices, Server {
}
@Override
- public MasterCoprocessorHost getCoprocessorHost() {
+ public MasterCoprocessorHost getMasterCoprocessorHost() {
return cpHost;
}
@@ -2421,28 +1490,15 @@ MasterServices, Server {
}
@Override
- public CatalogTracker getCatalogTracker() {
- return catalogTracker;
- }
-
- @Override
public AssignmentManager getAssignmentManager() {
return this.assignmentManager;
}
- @Override
- public TableLockManager getTableLockManager() {
- return this.tableLockManager;
- }
-
public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
return rsFatals;
}
public void shutdown() {
- if (spanReceiverHost != null) {
- spanReceiverHost.closeReceivers();
- }
if (cpHost != null) {
try {
cpHost.preShutdown();
@@ -2450,29 +1506,21 @@ MasterServices, Server {
LOG.error("Error call master coprocessor preShutdown()", ioe);
}
}
- if (mxBean != null) {
- MBeanUtil.unregisterMBean(mxBean);
- mxBean = null;
+ if (this.assignmentManager != null) {
+ this.assignmentManager.shutdown();
}
- if (this.assignmentManager != null) this.assignmentManager.shutdown();
- if (this.serverManager != null) this.serverManager.shutdownCluster();
try {
if (this.clusterStatusTracker != null){
this.clusterStatusTracker.setClusterDown();
+ if (this.serverManager != null) {
+ this.serverManager.shutdownCluster();
+ }
}
} catch (KeeperException e) {
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
}
}
- @Override
- public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
- throws ServiceException {
- LOG.info(getClientIdAuditPrefix() + " shutdown");
- shutdown();
- return ShutdownResponse.newBuilder().build();
- }
-
public void stopMaster() {
if (cpHost != null) {
try {
@@ -2484,49 +1532,26 @@ MasterServices, Server {
stop("Stopped by " + Thread.currentThread().getName());
}
- @Override
- public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
- throws ServiceException {
- LOG.info(getClientIdAuditPrefix() + " stop");
- stopMaster();
- return StopMasterResponse.newBuilder().build();
- }
-
- @Override
- public void stop(final String why) {
- LOG.info(why);
- this.stopped = true;
- // We wake up the stopSleeper to stop immediately
- stopSleeper.skipSleepCycle();
- // If we are a backup master, we need to interrupt wait
- if (this.activeMasterManager != null) {
- synchronized (this.activeMasterManager.clusterHasActiveMaster) {
- this.activeMasterManager.clusterHasActiveMaster.notifyAll();
- }
- }
- // If no region server is online then master may stuck waiting on hbase:meta to come on line.
- // See HBASE-8422.
- if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
- this.catalogTracker.stop();
+ void checkServiceStarted() throws ServerNotRunningYetException {
+ if (!serviceStarted) {
+ throw new ServerNotRunningYetException("Server is not running yet");
}
}
- @Override
- public boolean isStopped() {
- return this.stopped;
- }
-
- @Override
- public boolean isAborted() {
- return this.abort;
- }
-
- void checkInitialized() throws PleaseHoldException {
+ void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
+ checkServiceStarted();
if (!this.initialized) {
throw new PleaseHoldException("Master is initializing");
}
}
+ void checkNamespaceManagerReady() throws IOException {
+ checkInitialized();
+ if (tableNamespaceManager == null ||
+ !tableNamespaceManager.isTableAvailableAndInitialized()) {
+ throw new IOException("Table Namespace Manager not ready yet, try again later");
+ }
+ }
/**
* Report whether this master is currently the active master or not.
* If not active master, we are parked on ZK waiting to become active.
@@ -2571,178 +1596,10 @@ MasterServices, Server {
return this.initializationBeforeMetaAssignment;
}
- @Override
- public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
- throws ServiceException {
- try {
- final byte [] regionName = req.getRegion().getValue().toByteArray();
- RegionSpecifierType type = req.getRegion().getType();
- AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
-
- checkInitialized();
- if (type != RegionSpecifierType.REGION_NAME) {
- LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
- + " actual: " + type);
- }
- HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
- if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
- if (cpHost != null) {
- if (cpHost.preAssign(regionInfo)) {
- return arr;
- }
- }
- LOG.info(getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
- assignmentManager.assign(regionInfo, true, true);
- if (cpHost != null) {
- cpHost.postAssign(regionInfo);
- }
-
- return arr;
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
public void assignRegion(HRegionInfo hri) {
assignmentManager.assign(hri, true);
}
- @Override
- public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
- throws ServiceException {
- try {
- final byte [] regionName = req.getRegion().getValue().toByteArray();
- RegionSpecifierType type = req.getRegion().getType();
- final boolean force = req.getForce();
- UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
-
- checkInitialized();
- if (type != RegionSpecifierType.REGION_NAME) {
- LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
- + " actual: " + type);
- }
- Pair<HRegionInfo, ServerName> pair =
- MetaReader.getRegion(this.catalogTracker, regionName);
- if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
- HRegionInfo hri = pair.getFirst();
- if (cpHost != null) {
- if (cpHost.preUnassign(hri, force)) {
- return urr;
- }
- }
- LOG.debug(getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
- + " in current location if it is online and reassign.force=" + force);
- this.assignmentManager.unassign(hri, force);
- if (this.assignmentManager.getRegionStates().isRegionOffline(hri)) {
- LOG.debug("Region " + hri.getRegionNameAsString()
- + " is not online on any region server, reassigning it.");
- assignRegion(hri);
- }
- if (cpHost != null) {
- cpHost.postUnassign(hri, force);
- }
-
- return urr;
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
- /**
- * Get list of TableDescriptors for requested tables.
- * @param controller Unused (set to null).
- * @param req GetTableDescriptorsRequest that contains:
- * - tableNames: requested tables, or if empty, all are requested
- * @return GetTableDescriptorsResponse
- * @throws ServiceException
- */
- @Override
- public GetTableDescriptorsResponse getTableDescriptors(
- RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
- List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
- List<TableName> tableNameList = new ArrayList<TableName>();
- for(HBaseProtos.TableName tableNamePB: req.getTableNamesList()) {
- tableNameList.add(ProtobufUtil.toTableName(tableNamePB));
- }
- boolean bypass = false;
- if (this.cpHost != null) {
- try {
- bypass = this.cpHost.preGetTableDescriptors(tableNameList, descriptors);
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
-
- if (!bypass) {
- if (req.getTableNamesCount() == 0) {
- // request for all TableDescriptors
- Map<String, HTableDescriptor> descriptorMap = null;
- try {
- descriptorMap = this.tableDescriptors.getAll();
- } catch (IOException e) {
- LOG.warn("Failed getting all descriptors", e);
- }
- if (descriptorMap != null) {
- for(HTableDescriptor desc: descriptorMap.values()) {
- if(!desc.getTableName().isSystemTable()) {
- descriptors.add(desc);
- }
- }
- }
- } else {
- for (TableName s: tableNameList) {
- try {
- HTableDescriptor desc = this.tableDescriptors.get(s);
- if (desc != null) {
- descriptors.add(desc);
- }
- } catch (IOException e) {
- LOG.warn("Failed getting descriptor for " + s, e);
- }
- }
- }
-
- if (this.cpHost != null) {
- try {
- this.cpHost.postGetTableDescriptors(descriptors);
- } catch (IOException ioe) {
- throw new ServiceException(ioe);
- }
- }
- }
-
- GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
- for (HTableDescriptor htd: descriptors) {
- builder.addTableSchema(htd.convert());
- }
- return builder.build();
- }
-
- /**
- * Get list of userspace table names
- * @param controller Unused (set to null).
- * @param req GetTableNamesRequest
- * @return GetTableNamesResponse
- * @throws ServiceException
- */
- @Override
- public GetTableNamesResponse getTableNames(
- RpcController controller, GetTableNamesRequest req) throws ServiceException {
- try {
- Collection<HTableDescriptor> descriptors = this.tableDescriptors.getAll().values();
- GetTableNamesResponse.Builder builder = GetTableNamesResponse.newBuilder();
- for (HTableDescriptor descriptor: descriptors) {
- if (descriptor.getTableName().isSystemTable()) {
- continue;
- }
- builder.addTableNames(ProtobufUtil.toProtoTableName(descriptor.getTableName()));
- }
- return builder.build();
[... 488 lines stripped ...]