You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/06 18:53:04 UTC
incubator-gobblin git commit: [GOBBLIN-557] Reuse HiveConf object by
resource broker
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 28e3aece7 -> 25530f075
[GOBBLIN-557] Reuse HiveConf object by resource broker
Closes #2418 from
autumnust/fixHiveConfReinstantiate
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/25530f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/25530f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/25530f07
Branch: refs/heads/master
Commit: 25530f0755a336ebc86f482f0496459a8cc033fc
Parents: 28e3aec
Author: Lei Sun <au...@gmail.com>
Authored: Mon Aug 6 11:46:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 6 11:46:21 2018 -0700
----------------------------------------------------------------------
.../apache/gobblin/hive/HiveConfFactory.java | 57 ++++++++++++++++++++
.../hive/HiveMetaStoreClientFactory.java | 29 ++++++----
.../hive/metastore/HiveMetaStoreUtils.java | 14 +++--
.../gobblin/hive/HiveConfFactoryTest.java | 38 +++++++++++++
4 files changed, 126 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
new file mode 100644
index 0000000..447fc80
--- /dev/null
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConfFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * The factory that creates a {@link HiveConf} as shared resource.
+ * {@link EmptyKey} is fair since {@link HiveConf} seems to be read-only.
+ */
+public class HiveConfFactory<S extends ScopeType<S>> implements SharedResourceFactory<HiveConf, EmptyKey, S> {
+ public static final String FACTORY_NAME = "hiveConfFactory";
+
+ @Override
+ public String getName() {
+ return FACTORY_NAME;
+ }
+
+ @Override
+ public SharedResourceFactoryResponse<HiveConf> createResource(SharedResourcesBroker<S> broker,
+ ScopedConfigView<S, EmptyKey> config)
+ throws NotConfiguredException {
+ // We could extend the constructor of HiveConf to accept arguments in the future.
+ return new ResourceInstance<>(new HiveConf());
+ }
+
+ @Override
+ public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EmptyKey> config) {
+ return broker.selfScope().getType().rootScope();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
index 772b5d0..9907135 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveMetaStoreClientFactory.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.hive;
-import lombok.Getter;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -30,8 +28,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
-
-import com.google.common.base.Optional;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
@@ -39,12 +35,22 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
/**
* An implementation of {@link BasePooledObjectFactory} for {@link IMetaStoreClient}.
*/
+@Slf4j
public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClientFactory.class);
@@ -58,12 +64,17 @@ public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaSto
}
private static HiveConf getHiveConf(Optional<String> hcatURI) {
- HiveConf hiveConf = new HiveConf();
- if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) {
- hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get());
- hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get());
+ try {
+ HiveConf hiveConf = SharedResourcesBrokerFactory.getImplicitBroker()
+ .getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+ if (hcatURI.isPresent() && StringUtils.isNotBlank(hcatURI.get())) {
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hcatURI.get());
+ hiveConf.set(HIVE_METASTORE_TOKEN_SIGNATURE, hcatURI.get());
+ }
+ return hiveConf;
+ } catch (NotConfiguredException nce) {
+ throw new RuntimeException("Implicit broker is not correctly configured, failed to fetch a HiveConf object", nce);
}
- return hiveConf;
}
public HiveMetaStoreClientFactory(HiveConf hiveConf) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 61d3f5f..bdf8800 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.hive.metastore;
-import com.google.common.base.Splitter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -49,12 +48,17 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveConfFactory;
import org.apache.gobblin.hive.HiveConstants;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.HiveRegistrationUnit;
@@ -370,15 +374,19 @@ public class HiveMetaStoreUtils {
}
String serde = serdeClass.get();
- HiveConf hiveConf = new HiveConf();
-
+ HiveConf hiveConf;
Deserializer deserializer;
try {
+ hiveConf = SharedResourcesBrokerFactory
+ .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
deserializer =
ReflectionUtils.newInstance(hiveConf.getClassByName(serde).asSubclass(Deserializer.class), hiveConf);
} catch (ClassNotFoundException e) {
LOG.warn("Serde class " + serde + " not found!", e);
return null;
+ } catch (NotConfiguredException nce) {
+ LOG.error("Implicit broker is not configured properly", nce);
+ return null;
}
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25530f07/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
new file mode 100644
index 0000000..de3ac6e
--- /dev/null
+++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/HiveConfFactoryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+
+
+public class HiveConfFactoryTest {
+ @Test
+ public void testSameKey() throws Exception {
+ HiveConf hiveConf = SharedResourcesBrokerFactory
+ .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+ HiveConf hiveConf1 = SharedResourcesBrokerFactory
+ .getImplicitBroker().getSharedResource(new HiveConfFactory<>(), EmptyKey.INSTANCE);
+
+ Assert.assertEquals(hiveConf, hiveConf1);
+ }
+}
\ No newline at end of file