You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/02/22 18:48:26 UTC

svn commit: r1292420 [3/3] - in /hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore: HiveMetaStore.java RetryingRawStore.java events/EventCleanerTask.java

Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java?rev=1292420&view=auto
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (added)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java Wed Feb 22 17:48:25 2012
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RetryingRawStore implements InvocationHandler {
+
+  private static final Log LOG = LogFactory.getLog(RetryingRawStore.class);
+
+  private final RawStore base;
+  private int retryInterval = 0;
+  private int retryLimit = 0;
+  private JDOConnectionURLHook urlHook = null;
+  private String urlHookClassName = "";
+  private final int id;
+  private final HiveConf hiveConf;
+  private final Configuration conf; // thread local conf from HMS
+
+  protected RetryingRawStore(HiveConf hiveConf, Configuration conf, RawStore base, int id)
+      throws MetaException {
+    this.base = base;
+    this.conf = conf;
+    this.hiveConf = hiveConf;
+    this.id = id;
+    init();
+  }
+
+  public static RawStore getProxy(HiveConf hiveConf, Configuration conf, RawStore base, int id)
+      throws MetaException {
+
+    RetryingRawStore handler = new RetryingRawStore(hiveConf, conf, base, id);
+
+    return (RawStore) Proxy.newProxyInstance(RetryingRawStore.class.getClassLoader()
+        , base.getClass().getInterfaces(), handler);
+  }
+
+  private void init() throws MetaException {
+    retryInterval = HiveConf.getIntVar(hiveConf,
+        HiveConf.ConfVars.METASTOREINTERVAL);
+    retryLimit = HiveConf.getIntVar(hiveConf,
+        HiveConf.ConfVars.METASTOREATTEMPTS);
+    // Using the hook on startup ensures that the hook always has priority
+    // over settings in *.xml. We can use hiveConf as only a single thread
+    // will be calling the constructor.
+    updateConnectionURL(hiveConf, null);
+  }
+
+  private void initMS() {
+    base.setConf(getConf());
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+    Object ret = null;
+
+    boolean gotNewConnectUrl = false;
+    boolean reloadConf = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.METASTOREFORCERELOADCONF);
+
+    if (reloadConf) {
+      updateConnectionURL(getConf(), null);
+    }
+
+    int retryCount = 0;
+    Exception caughtException = null;
+    while (true) {
+      try {
+        if (reloadConf || gotNewConnectUrl) {
+          initMS();
+        }
+        ret = method.invoke(base, args);
+        break;
+      } catch (javax.jdo.JDOException e) {
+        caughtException = e;
+      } catch (UndeclaredThrowableException e) {
+        throw e.getCause();
+      } catch (InvocationTargetException e) {
+        throw e.getCause();
+      }
+
+      if (retryCount >= retryLimit) {
+        throw caughtException;
+      }
+
+      assert (retryInterval >= 0);
+      retryCount++;
+      LOG.error(
+          String.format(
+              "JDO datastore error. Retrying metastore command " +
+                  "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
+      Thread.sleep(retryInterval);
+      // If we have a connection error, the JDO connection URL hook might
+      // provide us with a new URL to access the datastore.
+      String lastUrl = getConnectionURL(getConf());
+      gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
+    }
+    return ret;
+  }
+
+  /**
+   * Updates the connection URL in hiveConf using the hook
+   *
+   * @return true if a new connection URL was loaded into the thread local
+   *         configuration
+   */
+  private boolean updateConnectionURL(Configuration conf, String badUrl)
+      throws MetaException {
+    String connectUrl = null;
+    String currentUrl = getConnectionURL(conf);
+    try {
+      // We always call init because the hook name in the configuration could
+      // have changed.
+      initConnectionUrlHook();
+      if (urlHook != null) {
+        if (badUrl != null) {
+          urlHook.notifyBadConnectionUrl(badUrl);
+        }
+        connectUrl = urlHook.getJdoConnectionUrl(hiveConf);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception while getting connection URL from the hook: " +
+          e);
+    }
+
+    if (connectUrl != null && !connectUrl.equals(currentUrl)) {
+      LOG.error(addPrefix(
+          String.format("Overriding %s with %s",
+              HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
+              connectUrl)));
+      conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
+          connectUrl);
+      return true;
+    }
+    return false;
+  }
+
+  private static String getConnectionURL(Configuration conf) {
+    return conf.get(
+        HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), "");
+  }
+
+  // Multiple threads could try to initialize at the same time.
+  synchronized private void initConnectionUrlHook()
+      throws ClassNotFoundException {
+
+    String className =
+        hiveConf.get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim();
+    if (className.equals("")) {
+      urlHookClassName = "";
+      urlHook = null;
+      return;
+    }
+    boolean urlHookChanged = !urlHookClassName.equals(className);
+    if (urlHook == null || urlHookChanged) {
+      urlHookClassName = className.trim();
+
+      Class<?> urlHookClass = Class.forName(urlHookClassName, true,
+          JavaUtils.getClassLoader());
+      urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null);
+    }
+    return;
+  }
+
+  private String addPrefix(String s) {
+    return id + ": " + s;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+}

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java?rev=1292420&r1=1292419&r2=1292420&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java Wed Feb 22 17:48:25 2012
@@ -23,7 +23,6 @@ import java.util.TimerTask;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
 import org.apache.hadoop.hive.metastore.RawStore;
 
 public class EventCleanerTask extends TimerTask{
@@ -40,12 +39,9 @@ public class EventCleanerTask extends Ti
   public void run() {
 
     try {
-      long deleteCnt = handler.executeWithRetry(new Command<Long>(){
-        @Override
-        public Long run(RawStore ms) throws Exception {
-          return ms.cleanupEvents();
-        }
-      });
+      RawStore ms = handler.getMS();
+      long deleteCnt = ms.cleanupEvents();
+
       if (deleteCnt > 0L){
         LOG.info("Number of events deleted from event Table: "+deleteCnt);
       }