You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/06 18:53:04 UTC

incubator-gobblin git commit: [GOBBLIN-557] Reuse HiveConf object by resource broker

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 28e3aece7 -> 25530f075


[GOBBLIN-557] Reuse HiveConf object by resource broker

Closes #2418 from
autumnust/fixHiveConfReinstantiate


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/25530f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/25530f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/25530f07

Branch: refs/heads/master
Commit: 25530f0755a336ebc86f482f0496459a8cc033fc
Parents: 28e3aec
Author: Lei Sun <au...@gmail.com>
Authored: Mon Aug 6 11:46:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 6 11:46:21 2018 -0700

----------------------------------------------------------------------
 .../apache/gobblin/hive/HiveConfFactory.java    | 57 ++++++++++++++++++++
 .../hive/HiveMetaStoreClientFactory.java        | 29 ++++++----
 .../hive/metastore/HiveMetaStoreUtils.java      | 14 +++--
 .../gobblin/hive/HiveConfFactoryTest.java       | 38 +++++++++++++
 4 files changed, 126 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
new file mode 100644
index 0000000..447fc80
--- /dev/null
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * The factory that creates a {@link HiveConf} as shared resource.
+ * {@link EmptyKey} is fair since {@link HiveConf} seems to be read-only.
+ */
+public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFactory<HiveConf, EmptyKey, S> {
+  public static final String FACTORY_NAME = "hiveConfFactory";
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<HiveConf> createResource(SharedResourcesBroker<S> broker,
+      ScopedConfigView<S, EmptyKey> config)
+      throws NotConfiguredException {
+    // We could extend the constructor of HiveConf to accept arguments in the future.
+    return new ResourceInstance<>(new HiveConf());
+  }
+
+  @Override
+  public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EmptyKey> config) {
+    return broker.selfScope().getType().rootScope();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
index 772b5d0..9907135 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.hive;
 
-import lombok.Getter;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
@@ -30,8 +28,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-
-import com.google.common.base.Optional;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
@@ -39,12 +35,22 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 
 /**
  * An implementation of {@link BasePooledObjectFactory} for {@link IMetaStoreClient}.
  */
+@Slf4j
 public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientFactory.class);
@@ -58,12 +64,17 @@ public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaSto
   }
 
   private static HiveConf getHiveConf(Optional<String> hcatURI) {
-    HiveConf hiveConf = new HiveConf();
-    if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) {
-      hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get());
-      hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get());
+    try {
+      HiveConf hiveConf = SharedResourcesBrokerFactory.getImplicitBroker()
+          .getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+      if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) {
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get());
+        hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get());
+      }
+      return hiveConf;
+    } catch (NotConfiguredException nce) {
+      throw new RuntimeException("Implicit broker is not correctly configured, failed to fetch a HiveConf object", nce);
     }
-    return hiveConf;
   }
 
   public HiveMetaStoreClientFactory(HiveConf hiveConf) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 61d3f5f..bdf8800 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.hive.metastore;
 
-import com.google.common.base.Splitter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -49,12 +48,17 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.Ints;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveConfFactory;
 import org.apache.gobblin.hive.HiveConstants;
 import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.HiveRegistrationUnit;
@@ -370,15 +374,19 @@ public class HiveMetaStoreUtils {
     }
 
     String serde = serdeClass.get();
-    HiveConf hiveConf = new HiveConf();
-
+    HiveConf hiveConf;
     Deserializer deserializer;
     try {
+      hiveConf = SharedResourcesBrokerFactory
+        .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
       deserializer =
           ReflectionUtils.newInstance(hiveConf.getClassByName(serde).asSubclass(Deserializer.class), hiveConf);
     } catch (ClassNotFoundException e) {
       LOG.warn("Serde class " + serde + " not found!", e);
       return null;
+    } catch (NotConfiguredException nce) {
+      LOG.error("Implicit broker is not configured properly", nce);
+      return null;
     }
 
     Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
new file mode 100644
index 0000000..de3ac6e
--- /dev/null
+++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+
+
+public class HiveConfFactoryTest {
+  @Test
+  public void testSameKey() throws Exception {
+    HiveConf hiveConf = SharedResourcesBrokerFactory
+        .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+    HiveConf hiveConf1 = SharedResourcesBrokerFactory
+        .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+
+    Assert.assertEquals(hiveConf, hiveConf1);
+  }
+}
\ No newline at end of file