You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/07/24 20:11:32 UTC

hive git commit: HIVE-17117: Metalisteners are not notified when threadlocal metaconf is cleanup (Prashant Golash, reviewed by Mohit, Chao and Zheng)

Repository: hive
Updated Branches:
  refs/heads/master 9a8533148 -> 764c3a5a5


HIVE-17117: Metalisteners are not notified when threadlocal metaconf is cleanup (Prashant Golash, reviewed by Mohit, Chao and Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/764c3a5a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/764c3a5a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/764c3a5a

Branch: refs/heads/master
Commit: 764c3a5a509f9d0ca9e7259aa0a6bf4ad23292d1
Parents: 9a85331
Author: Prashant Golash <pr...@gmail.com>
Authored: Mon Jul 24 13:11:00 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Mon Jul 24 13:11:07 2017 -0700

----------------------------------------------------------------------
 .../metastore/TestMetaStoreEventListener.java   |  70 ++++++++++-
 .../hadoop/hive/metastore/HiveMetaStore.java    | 121 ++++++++++++++++---
 2 files changed, 169 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/764c3a5a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
index fd4527e..f384991 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
@@ -81,6 +81,8 @@ public class TestMetaStoreEventListener extends TestCase {
   private static final String dbName = "hive2038";
   private static final String tblName = "tmptbl";
   private static final String renamed = "tmptbl2";
+  private static final String metaConfKey = "hive.metastore.partition.name.whitelist.pattern";
+  private static final String metaConfVal = "";
 
   @Override
   protected void setUp() throws Exception {
@@ -93,9 +95,11 @@ public class TestMetaStoreEventListener extends TestCase {
         DummyPreListener.class.getName());
 
     int port = MetaStoreUtils.findFreePort();
-    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge());
-
     hiveConf = new HiveConf(this.getClass());
+
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN, metaConfVal);
+    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+
     hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
     hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
@@ -455,4 +459,66 @@ public class TestMetaStoreEventListener extends TestCase {
     assertEquals("true", event.getOldValue());
     assertEquals("false", event.getNewValue());
   }
+
+  public void testMetaConfNotifyListenersClosingClient() throws Exception {
+    HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null);
+    closingClient.setMetaConf(metaConfKey, "[test pattern modified]");
+    ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    assertEquals(event.getOldValue(), metaConfVal);
+    assertEquals(event.getNewValue(), "[test pattern modified]");
+    closingClient.close();
+
+    Thread.sleep(5 * 1000);
+
+    event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    assertEquals(event.getOldValue(), "[test pattern modified]");
+    assertEquals(event.getNewValue(), metaConfVal);
+  }
+
+  public void testMetaConfNotifyListenersNonClosingClient() throws Exception {
+    HiveMetaStoreClient nonClosingClient = new HiveMetaStoreClient(hiveConf, null);
+    nonClosingClient.setMetaConf(metaConfKey, "[test pattern modified]");
+    ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    assertEquals(event.getOldValue(), metaConfVal);
+    assertEquals(event.getNewValue(), "[test pattern modified]");
+    // This should also trigger meta listener notification via TServerEventHandler#deleteContext
+    nonClosingClient.getTTransport().close();
+
+    Thread.sleep(5 * 1000);
+
+    event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    assertEquals(event.getOldValue(), "[test pattern modified]");
+    assertEquals(event.getNewValue(), metaConfVal);
+  }
+
+  public void testMetaConfDuplicateNotification() throws Exception {
+    HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null);
+    closingClient.setMetaConf(metaConfKey, metaConfVal);
+    int beforeCloseNotificationEventCounts = DummyListener.notifyList.size();
+    closingClient.close();
+
+    Thread.sleep(5 * 1000);
+
+    int afterCloseNotificationEventCounts = DummyListener.notifyList.size();
+    // Setting key to same value, should not trigger configChange event during shutdown
+    assertEquals(beforeCloseNotificationEventCounts, afterCloseNotificationEventCounts);
+  }
+
+  public void testMetaConfSameHandler() throws Exception {
+    HiveMetaStoreClient closingClient = new HiveMetaStoreClient(hiveConf, null);
+    closingClient.setMetaConf(metaConfKey, "[test pattern modified]");
+    ConfigChangeEvent event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    int beforeCloseNotificationEventCounts = DummyListener.notifyList.size();
+    HiveMetaStore.HMSHandler beforeHandler = event.getHandler();
+    closingClient.close();
+
+    Thread.sleep(5 * 1000);
+    event = (ConfigChangeEvent) DummyListener.getLastEvent();
+    int afterCloseNotificationEventCounts = DummyListener.notifyList.size();
+    HiveMetaStore.HMSHandler afterHandler = event.getHandler();
+    // Meta-conf cleanup should trigger an event to listener
+    assertNotSame(beforeCloseNotificationEventCounts, afterCloseNotificationEventCounts);
+    // Both the handlers should be same
+    assertEquals(beforeHandler, afterHandler);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/764c3a5a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 58b9044..bbbcebe 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -40,6 +40,8 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
@@ -275,6 +277,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         };
 
+    /**
+     * Thread local HMSHandler used during shutdown to notify meta listeners
+     */
+    private static final ThreadLocal<HMSHandler> threadLocalHMSHandler = new ThreadLocal<>();
+
+    /**
+     * Thread local Map to keep track of modified meta conf keys
+     */
+    private static final ThreadLocal<Map<String, String>> threadLocalModifiedConfig =
+        new ThreadLocal<>();
+
     private static ExecutorService threadPool;
 
     public static final String AUDIT_FORMAT =
@@ -344,6 +357,55 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     };
 
+    /**
+     * Internal function to notify listeners for meta config change events
+     */
+    private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException {
+      for (MetaStoreEventListener listener : listeners) {
+        listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue));
+      }
+
+      if (transactionalListeners.size() > 0) {
+        // All the fields of this event are final, so no reason to create a new one for each
+        // listener
+        ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue);
+        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+          transactionalListener.onConfigChange(cce);
+        }
+      }
+    }
+
+    /**
+     * Internal function to notify listeners to revert back to old values of keys
+     * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore
+     */
+    private void notifyMetaListenersOnShutDown() {
+      Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+      if (modifiedConf == null) {
+        // Nothing got modified
+        return;
+      }
+      try {
+        Configuration conf = threadLocalConf.get();
+        if (conf == null) {
+          throw new MetaException("Unexpected: modifiedConf is non-null but conf is null");
+        }
+        // Notify listeners of the changed value
+        for (Entry<String, String> entry : modifiedConf.entrySet()) {
+          String key = entry.getKey();
+          // curr value becomes old and vice-versa
+          String currVal = entry.getValue();
+          String oldVal = conf.get(key);
+          if (!Objects.equals(oldVal, currVal)) {
+            notifyMetaListeners(key, oldVal, currVal);
+          }
+        }
+        logInfo("Meta listeners shutdown notification completed.");
+      } catch (MetaException e) {
+        LOG.error("Failed to notify meta listeners on shutdown: ", e);
+      }
+    }
+
     public static void setThreadLocalIpAddress(String ipAddress) {
       threadLocalIpAddress.set(ipAddress);
     }
@@ -513,6 +575,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return threadLocalId.get() + ": " + s;
     }
 
+    /**
+     * Set copy of invoking HMSHandler on thread local
+     */
+    private static void setHMSHandler(HMSHandler handler) {
+      if (threadLocalHMSHandler.get() == null) {
+        threadLocalHMSHandler.set(handler);
+      }
+    }
     @Override
     public void setConf(Configuration conf) {
       threadLocalConf.set(conf);
@@ -532,6 +602,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return conf;
     }
 
+    private Map<String, String> getModifiedConf() {
+      Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+      if (modifiedConf == null) {
+        modifiedConf = new HashMap<String, String>();
+        threadLocalModifiedConfig.set(modifiedConf);
+      }
+      return modifiedConf;
+    }
+
     public Warehouse getWh() {
       return wh;
     }
@@ -549,20 +628,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
       Configuration configuration = getConf();
       String oldValue = configuration.get(key);
+      // Save prev val of the key on threadLocal
+      Map<String, String> modifiedConf = getModifiedConf();
+      if (!modifiedConf.containsKey(key)) {
+        modifiedConf.put(key, oldValue);
+      }
+      // Set invoking HMSHandler on threadLocal, this will be used later to notify
+      // metaListeners in HiveMetaStore#cleanupRawStore
+      setHMSHandler(this);
       configuration.set(key, value);
-
-      for (MetaStoreEventListener listener : listeners) {
-        listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, value));
-      }
-
-      if (transactionalListeners.size() > 0) {
-        // All the fields of this event are final, so no reason to create a new one for each
-        // listener
-        ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, value);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onConfigChange(cce);
-        }
-      }
+      notifyMetaListeners(key, oldValue, value);
     }
 
     @Override
@@ -7509,15 +7584,21 @@ public class HiveMetaStore extends ThriftHiveMetastore {
   }
 
   private static void cleanupRawStore() {
-    RawStore rs = HMSHandler.getRawStore();
-    if (rs != null) {
-      HMSHandler.logInfo("Cleaning up thread local RawStore...");
-      try {
+    try {
+      RawStore rs = HMSHandler.getRawStore();
+      if (rs != null) {
+        HMSHandler.logInfo("Cleaning up thread local RawStore...");
         rs.shutdown();
-      } finally {
-        HMSHandler.threadLocalConf.remove();
-        HMSHandler.removeRawStore();
       }
+    } finally {
+      HMSHandler handler = HMSHandler.threadLocalHMSHandler.get();
+      if (handler != null) {
+        handler.notifyMetaListenersOnShutDown();
+      }
+      HMSHandler.threadLocalHMSHandler.remove();
+      HMSHandler.threadLocalConf.remove();
+      HMSHandler.threadLocalModifiedConfig.remove();
+      HMSHandler.removeRawStore();
       HMSHandler.logInfo("Done cleaning up thread local RawStore");
     }
   }