You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/09/27 00:24:25 UTC

[GitHub] [hive] saihemanth-cloudera commented on a diff in pull request #3567: HIVE-26509: Introduce dynamic leader election in HMS

saihemanth-cloudera commented on code in PR #3567:
URL: https://github.com/apache/hive/pull/3567#discussion_r975629801


##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java:
##########
@@ -1523,6 +1531,11 @@ public enum ConfVars {
     TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel", "hive.txn.use.minhistorylevel", true,
         "Set this to false, for the TxnHandler and Cleaner to not use MinHistoryLevel table and take advantage of openTxn optimisation.\n"
             + "If the table is dropped HMS will switch this flag to false."),
+    LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100,

Review Comment:
   I assume this lock is for polling to see if a lock is available to become a leader, is that correct? If so should we change this name to make it more understandable to specify it is for leader election? Also, I'm wondering why are we retrying the number of retries to 100, can you please explain?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java:
##########
@@ -891,33 +855,34 @@ public void run() {
             startCondition.await();
           }
 
-          if (isLeader) {
-            startCompactorInitiator(conf);
-            startCompactorCleaner(conf);
-            startRemoteOnlyTasks(conf);
-            startStatsUpdater(conf);
-            HMSHandler.startAlwaysTaskThreads(conf);
-          }
-
-          // The leader HMS may not necessarily have sufficient compute capacity required to run
-          // actual compaction work. So it can run on a non-leader HMS with sufficient capacity
-          // or a configured HS2 instance.
-          if (MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) {
-            LOG.warn("Running compaction workers on HMS side is not suggested because compaction pools are not supported in HMS " +
-                "(HIVE-26443). Consider removing the hive.metastore.runworker.in configuration setting, as it will be " +
-                "comletely removed in future releases.");
-            startCompactorWorkers(conf);
+         LeaderElectionContext context = new LeaderElectionContext.ContextBuilder(conf)
+             .setHMSHandler(thriftServer.getHandler()).servHost(getServerHostName())
+             // always tasks
+             .setTType(LeaderElectionContext.TTYPE.ALWAYS_TASKS)
+             .addListener(new HouseKeepingTasks(conf, false))
+             // housekeeping tasks
+             .setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING)
+             .addListener(new CMClearer(conf))
+             .addListener(new StatsUpdaterTask(conf))
+             .addListener(new CompactorTasks(conf, false))
+             .addListener(new CompactorPMF())
+             .addListener(new HouseKeepingTasks(conf, true))

Review Comment:
   Do we need to include partition discovery task here?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java:
##########
@@ -185,5 +188,20 @@ void searchHousekeepingThreads() throws Exception {
       }
     }
   }
+
+  private void resetThreadStatus() {
+    Map<String, Boolean> newThreadNames = new HashMap<>();
+    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+      newThreadNames.put(entry.getKey(), false);

Review Comment:
   Can we directly set the values of threadnames to false instead of assigning them to a new variable?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetastoreLeaseNonLeader {
+
+  LeaderElection election;
+
+  TestMetastoreHousekeepingLeader hms;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    TestTxnDbUtil.setConfValues(conf);
+    TestTxnDbUtil.prepDb(conf);
+    election = new LeaseLeaderElection();
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
+    TableName tableName = (TableName) LeaderElectionContext.getLeaderMutex(conf,
+        LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
+    election.tryBeLeader(conf, tableName);
+    assertTrue("The elector should hold the lease now", election.isLeader());
+    // start the non-leader hms now
+    hms = new TestMetastoreHousekeepingLeader();
+    MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
+    hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
+    hms.internalSetup("", false);

Review Comment:
   Maybe we need an assertFalse or Log info to verify that hms is not the leader now.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Hive Lock based leader election.
+ * If wins, the current instance becomes the leader,
+ * and a heartbeat daemon will be started to renew the lock before timeout.
+ * If loses, a non-leader watcher will also be started to check the
+ * lock periodically to see if he can grab the lock in order to be the leader.
+ * The change of Leadership can be received by registering the
+ * listeners through {@link LeaderElection#addStateListener}.
+ */
+public class LeaseLeaderElection implements LeaderElection<TableName> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaseLeaderElection.class);
+
+  private static final AtomicLong ID = new AtomicLong();
+
+  // Result of election
+  private volatile boolean isLeader;
+
+  private TxnStore store;
+
+  // Initial sleep time for locking the table at retrying.
+  private long nextSleep = 50;
+
+  // A daemon used for renewing the lock before timeout,
+  // this happens when the current instance wins the election.
+  private LeaseWatcher heartbeater;
+
+  // For non-leader instances to check the lock periodically to
+  // see if there is a chance to take over the leadership.
+  // At any time, only one of heartbeater and nonLeaderWatcher is alive.
+  private LeaseWatcher nonLeaderWatcher;
+
+  // Current lock id
+  private volatile long lockId = -1;
+
+  // Leadership change listeners
+  private List<LeadershipStateListener> listeners = new ArrayList<>();
+
+  // Property for testing only
+  public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease";
+
+  private String name;
+
+  private void doWork(LockResponse resp, Configuration conf,
+      TableName tableName) throws LeaderException {
+    lockId = resp.getLockid();
+    assert resp.getState() == LockState.ACQUIRED || resp.getState() == LockState.WAITING;
+    shutdownWatcher();
+
+    switch (resp.getState()) {
+    case ACQUIRED:
+      boolean renewLease = conf.getBoolean(METASTORE_RENEW_LEASE, true);
+      heartbeater = renewLease ?
+          new Heartbeater(conf, tableName) : new ReleaseAndRequireWatcher(conf, tableName);
+      heartbeater.perform();
+      if (!isLeader) {
+        isLeader = true;
+        notifyListener();
+      }
+      break;
+    case WAITING:
+      nonLeaderWatcher = new NonLeaderWatcher(conf, tableName);
+      nonLeaderWatcher.perform();
+      if (isLeader) {
+        isLeader = false;
+        notifyListener();
+      }
+      break;
+    default:
+      throw new IllegalStateException("Unexpected lock state: " + resp.getState());
+    }
+  }
+
+  private void notifyListener() {
+    listeners.forEach(listener -> {
+      try {
+        if (isLeader) {
+          listener.takeLeadership(this);
+        } else {
+          listener.lossLeadership(this);

Review Comment:
   if the current listener isn't a leader and then we are unnecessarily telling to lose its leadership since it is waiting to be a leader.  I think we should only call this when it is losing its leadership role.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/AuditLeaderListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static java.util.Objects.requireNonNull;
+
+public class AuditLeaderListener implements LeaderElection.LeadershipStateListener {
+  private final Configuration configuration;
+
+  private final Path tableLocation;
+
+  private final static String SERDE = "org.apache.hadoop.hive.serde2.JsonSerDe";
+  private final static String INPUTFORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+  private final static String OUTPUTFORMAT = "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat";
+
+  public AuditLeaderListener(TableName tableName, IHMSHandler handler) throws Exception {
+    requireNonNull(tableName, "tableName is null");
+    requireNonNull(handler, "handler is null");
+    this.configuration = handler.getConf();
+    try {
+      // store the leader info as json + text for human-readable
+      Table table = new TableBuilder()
+          .setCatName(tableName.getCat())
+          .setDbName(tableName.getDb())
+          .setTableName(tableName.getTable())
+          .addCol("leader", ColumnType.STRING_TYPE_NAME)
+          .addCol("name", ColumnType.STRING_TYPE_NAME)
+          .addCol("elected_time", ColumnType.BIGINT_TYPE_NAME)
+          .setSerdeLib(SERDE)
+          .setInputFormat(INPUTFORMAT)
+          .setOutputFormat(OUTPUTFORMAT)
+          .build(handler.getConf());
+      handler.create_table(table);

Review Comment:
   I think we need to set the owner of this table, right? Also, is there a way to recreate this table, if this is dropped for 
    some reason?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/AuditLeaderListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static java.util.Objects.requireNonNull;
+
+public class AuditLeaderListener implements LeaderElection.LeadershipStateListener {
+  private final Configuration configuration;
+
+  private final Path tableLocation;
+
+  private final static String SERDE = "org.apache.hadoop.hive.serde2.JsonSerDe";
+  private final static String INPUTFORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+  private final static String OUTPUTFORMAT = "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat";
+
+  public AuditLeaderListener(TableName tableName, IHMSHandler handler) throws Exception {
+    requireNonNull(tableName, "tableName is null");
+    requireNonNull(handler, "handler is null");
+    this.configuration = handler.getConf();
+    try {
+      // store the leader info as json + text for human-readable
+      Table table = new TableBuilder()
+          .setCatName(tableName.getCat())
+          .setDbName(tableName.getDb())
+          .setTableName(tableName.getTable())
+          .addCol("leader", ColumnType.STRING_TYPE_NAME)
+          .addCol("name", ColumnType.STRING_TYPE_NAME)
+          .addCol("elected_time", ColumnType.BIGINT_TYPE_NAME)
+          .setSerdeLib(SERDE)
+          .setInputFormat(INPUTFORMAT)
+          .setOutputFormat(OUTPUTFORMAT)
+          .build(handler.getConf());
+      handler.create_table(table);
+    } catch (AlreadyExistsException e) {
+      // ignore
+    }
+
+    Table table = handler.getMS().getTable(tableName.getCat(),
+        tableName.getDb(), tableName.getTable());
+    this.tableLocation = new Path(table.getSd().getLocation());
+    String serde = table.getSd().getSerdeInfo().getSerializationLib();
+    String input = table.getSd().getInputFormat();
+    String output = table.getSd().getOutputFormat();
+    if (!SERDE.equals(serde) || !INPUTFORMAT.equals(input)
+        || !OUTPUTFORMAT.equals(output)) {
+      throw new RuntimeException(tableName + " should be in json + text format");
+    }
+
+  }
+
+  @Override
+  public void takeLeadership(LeaderElection election) throws Exception {
+    String hostName = getHostname();
+    String message = "{\"leader\": \"" + hostName + "\", \"name\": \""
+        + election.getName() + "\", \"elected_time\": " + System.currentTimeMillis() + "} \n";
+    Path path = new Path(tableLocation, "leader.json");
+    try {
+      FileSystem fs = Warehouse.getFs(path, configuration);
+      try (OutputStream outputStream = fs.exists(path) ?
+          fs.append(path) :
+          fs.create(path, false)) {
+        outputStream.write(message.getBytes(StandardCharsets.UTF_8));

Review Comment:
   Why are we trying to persist leader election info under the table directory instead of creating an entry into the table?



##########
ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java:
##########
@@ -159,6 +159,7 @@ public void run() {
         return;
       }
     }
+    stopWorkers();

Review Comment:
   Why do we need this, when we already have L158?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java:
##########
@@ -58,7 +58,8 @@ public void testHouseKeepingThreadExistence() throws Exception {
         if (entry.getValue()) {
           LOG.info("Thread found for " + entry.getKey().getSimpleName());
         }
-        Assert.assertTrue("No thread found for " + entry.getKey().getSimpleName(), entry.getValue());
+        // Worker now runs on a leader hms
+        // Assert.assertTrue("No thread found for " + entry.getKey().getSimpleName(), entry.getValue());

Review Comment:
   Nit: Can you remove the commented code?



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java:
##########
@@ -562,6 +562,14 @@ public enum ConfVars {
         "match that configuration. Otherwise it should be same as the hostname returned by " +
         "InetAddress#getLocalHost#getHostName(). Given the uncertainty in the later " +
         "it is desirable to configure metastore.thrift.bind.host on the intended leader HMS."),
+    METASTORE_HOUSEKEEPING_LEADER_ELECTION("metastore.housekeeping.leader.election",
+        "metastore.housekeeping.leader.election",

Review Comment:
   should this be hive.metastore.leader.election? Similarly should 'hive.' prefix be added to L571?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMetastoreLeaseLeader {
+
+  LeaderElection election;
+
+  TestMetastoreHousekeepingLeader hms;
+
+  @Before
+  public void setUp() throws Exception {
+    hms = new TestMetastoreHousekeepingLeader();
+    MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
+    hms.conf.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);

Review Comment:
   Should we add this config in MetaStore conf instead of LeaseLeaderElection class?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java:
##########
@@ -58,7 +58,8 @@ public void testHouseKeepingThreadExistence() throws Exception {
         if (entry.getValue()) {
           LOG.info("Thread found for " + entry.getKey().getSimpleName());
         }
-        Assert.assertTrue("No thread found for " + entry.getKey().getSimpleName(), entry.getValue());
+        // Worker now runs on a leader hms
+        // Assert.assertTrue("No thread found for " + entry.getKey().getSimpleName(), entry.getValue());

Review Comment:
   You can remove L62 as this is a commented line



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Hive Lock based leader election.
+ * If wins, the current instance becomes the leader,
+ * and a heartbeat daemon will be started to renew the lock before timeout.
+ * If loses, a non-leader watcher will also be started to check the
+ * lock periodically to see if he can grab the lock in order to be the leader.
+ * The change of Leadership can be received by registering the
+ * listeners through {@link LeaderElection#addStateListener}.
+ */
+public class LeaseLeaderElection implements LeaderElection<TableName> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaseLeaderElection.class);
+
+  private static final AtomicLong ID = new AtomicLong();
+
+  // Result of election
+  private volatile boolean isLeader;
+
+  private TxnStore store;
+
+  // Initial sleep time for locking the table at retrying.
+  private long nextSleep = 50;
+
+  // A daemon used for renewing the lock before timeout,
+  // this happens when the current instance wins the election.
+  private LeaseWatcher heartbeater;
+
+  // For non-leader instances to check the lock periodically to
+  // see if there is a chance to take over the leadership.
+  // At any time, only one of heartbeater and nonLeaderWatcher is alive.
+  private LeaseWatcher nonLeaderWatcher;
+
+  // Current lock id
+  private volatile long lockId = -1;
+
+  // Leadership change listeners
+  private List<LeadershipStateListener> listeners = new ArrayList<>();
+
+  // Property for testing only
+  public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease";

Review Comment:
   I don't think this property is just for testing, it is being used in actual feature. Is it correct?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection.LeadershipStateListener;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class LeaderElectionContext {
+
+  /**
+   * Types of tasks that exactly have one instance in a given warehouse.
+   * For those tasks which belong to the same type, they will be running in the same leader.
+   */
+  public enum TTYPE {
+    HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_housekeeping_leader"), "housekeeping"),
+    WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_worker_leader"), "compactor_worker"),
+    ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_always_tasks_leader"), "always_tasks");
+    // Mutex of TTYPE, which can be a nonexistent table
+    private final TableName mutex;
+    // Name of TTYPE
+    private final String name;
+
+    TTYPE(TableName tableName, String name) {
+      this.mutex = tableName;
+      this.name  = name;
+    }
+    public TableName getTableName() {
+      return mutex;
+    }
+    public String getName() {
+      return name;
+    }
+  }
+
+  private final Configuration conf;
+  private final String servHost;
+  // Whether the context should be started as a daemon
+  private final boolean startAsDaemon;
+  // Audit the event of election
+  private AuditLeaderListener auditLeaderListener;
+  // State change listeners group by type
+  private final Map<TTYPE, List<LeadershipStateListener>> listeners;
+  // Collection of leader candidates
+  private final List<LeaderElection> leaderElections = new ArrayList<>();
+  // Property for testing, a single leader will be created
+  public final static String LEADER_IN_TEST = "metastore.leader.election.in.test";
+
+  private LeaderElectionContext(String servHost, Configuration conf,
+      Map<TTYPE, List<LeadershipStateListener>> listeners,
+      boolean startAsDaemon, IHMSHandler handler) throws Exception {
+    requireNonNull(conf, "conf is null");
+    requireNonNull(listeners, "listeners is null");
+    this.servHost = servHost;
+    this.conf = new Configuration(conf);
+    this.startAsDaemon = startAsDaemon;
+    String tableName = MetastoreConf.getVar(conf,
+        MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE);
+    if (StringUtils.isNotEmpty(tableName)) {
+      TableName table = TableName.fromString(tableName, MetaStoreUtils.getDefaultCatalog(conf),
+          Warehouse.DEFAULT_DATABASE_NAME);

Review Comment:
   Should we audit the table in sys db instead of the default database? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org