You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 00:57:33 UTC
incubator-gobblin git commit: [GOBBLIN-432] Share the DataSource used
by the MySQL state stores
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 1c5fb6ec4 -> 60adccfd9
[GOBBLIN-432] Share the DataSource used by the MySQL state stores
Closes #2311 from
htran1/mysql_state_store_share_data_source
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60adccfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60adccfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60adccfd
Branch: refs/heads/master
Commit: 60adccfd96a33d19cfac8af6e26dd837b8156f89
Parents: 1c5fb6e
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Mar 21 17:57:27 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Mar 21 17:57:27 2018 -0700
----------------------------------------------------------------------
.../metastore/MysqlDataSourceFactory.java | 85 ++++++++++++++++++
.../gobblin/metastore/MysqlDataSourceKey.java | 71 +++++++++++++++
.../gobblin/metastore/MysqlStateStore.java | 14 +++
.../metastore/MysqlStateStoreFactory.java | 10 ++-
.../metastore/MysqlDataSourceFactoryTest.java | 93 ++++++++++++++++++++
.../runtime/MysqlDatasetStateStoreFactory.java | 12 ++-
6 files changed, 278 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
new file mode 100644
index 0000000..45bd045
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A {@link SharedResourceFactory} for creating {@link BasicDataSource}s.
+ *
+ * The factory creates a {@link BasicDataSource} with the config.
+ */
+@Slf4j
+public class MysqlDataSourceFactory<S extends ScopeType<S>>
+ implements SharedResourceFactory<BasicDataSource, MysqlDataSourceKey, S> {
+
+ public static final String FACTORY_NAME = "basicDataSource";
+
+ /**
+ * Get a {@link BasicDataSource} based on the config
+ * @param config configuration
+ * @param broker broker
+ * @return a {@link BasicDataSource}
+ * @throws IOException
+ */
+ public static <S extends ScopeType<S>> BasicDataSource get(Config config,
+ SharedResourcesBroker<S> broker) throws IOException {
+ try {
+ return broker.getSharedResource(new MysqlDataSourceFactory<S>(),
+ new MysqlDataSourceKey(MysqlStateStore.getDataSourceId(config), config));
+ } catch (NotConfiguredException nce) {
+ throw new IOException(nce);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return FACTORY_NAME;
+ }
+
+ @Override
+ public SharedResourceFactoryResponse<BasicDataSource> createResource(SharedResourcesBroker<S> broker,
+ ScopedConfigView<S, MysqlDataSourceKey> config) throws NotConfiguredException {
+ MysqlDataSourceKey key = config.getKey();
+ Config configuration = key.getConfig();
+
+ BasicDataSource dataSource = MysqlStateStore.newDataSource(configuration);
+
+ return new ResourceInstance<>(dataSource);
+ }
+
+ @Override
+ public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, MysqlDataSourceKey> config) {
+ return broker.selfScope().getType().rootScope();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
new file mode 100644
index 0000000..bccab28
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+
+import lombok.Getter;
+
+/**
+ * {@link SharedResourceKey} for requesting {@link BasicDataSource}s from a
+ * {@link org.apache.gobblin.broker.iface.SharedResourceFactory}
+ */
+@Getter
+public class MysqlDataSourceKey implements SharedResourceKey {
+ private final String dataSourceName;
+ private final Config config;
+
+ /**
+ * Constructs a key for the mysql data source. The dataSourceName is used as the key.
+ * @param dataSourceName an identifier for the data source
+ * @param config configuration that is passed along to configure the data source
+ */
+ public MysqlDataSourceKey(String dataSourceName, Config config) {
+ this.dataSourceName = dataSourceName;
+ this.config = config;
+ }
+
+ @Override
+ public String toConfigurationKey() {
+ return this.dataSourceName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MysqlDataSourceKey that = (MysqlDataSourceKey) o;
+
+ return dataSourceName == null ?
+ that.dataSourceName == null : dataSourceName.equals(that.dataSourceName);
+ }
+
+ @Override
+ public int hashCode() {
+ return dataSourceName != null ? dataSourceName.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index b276702..d5ae6ff 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -176,6 +176,20 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
return basicDataSource;
}
+ /**
+ * return an identifier for the data source based on the configuration
+ * @param config configuration
+ * @return a {@link String} to identify the data source
+ */
+ public static String getDataSourceId(Config config) {
+ PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));
+
+ return ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER) + "::"
+ + config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY) + "::"
+ + passwordManager.readPassword(config.getString(ConfigurationKeys.STATE_STORE_DB_USER_KEY));
+ }
+
@Override
public boolean create(String storeName) throws IOException {
/* nothing to do since state will be stored as a new row in a DB table that has been validated */
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
index aad441a..4af3730 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
@@ -16,25 +16,29 @@
*/
package org.apache.gobblin.metastore;
+import org.apache.commons.dbcp.BasicDataSource;
+
import com.typesafe.config.Config;
+
import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ConfigUtils;
-import java.util.Properties;
-import org.apache.commons.dbcp.BasicDataSource;
@Alias("mysql")
public class MysqlStateStoreFactory implements StateStore.Factory {
@Override
public <T extends State> StateStore<T> createStateStore(Config config, Class<T> stateClass) {
- BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
try {
+ BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
return new MysqlStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass);
} catch (Exception e) {
throw new RuntimeException("Failed to create MysqlStateStore with factory", e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
new file mode 100644
index 0000000..a11a4af
--- /dev/null
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+/**
+ * Unit tests for {@link MysqlDataSourceFactory}.
+ */
+@Test(groups = { "gobblin.metastore" })
+public class MysqlDataSourceFactoryTest {
+
+ @Test
+ public void testSameKey() throws IOException {
+
+ Config config = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url",
+ ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+ ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+ BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ Assert.assertEquals(basicDataSource1, basicDataSource2);
+ }
+
+ @Test
+ public void testDifferentKey() throws IOException {
+
+ Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+ ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+ ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+ Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url2",
+ ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+ ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+ BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ Assert.assertNotEquals(basicDataSource1, basicDataSource2);
+ }
+
+ @Test
+ public void testSameDbDifferentUser() throws IOException {
+
+ Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+ ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user1",
+ ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+ Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+ ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user2",
+ ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+ BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ Assert.assertNotEquals(basicDataSource1, basicDataSource2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
index fea2a62..8d5b291 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
@@ -16,19 +16,20 @@
*/
package org.apache.gobblin.runtime;
+import org.apache.commons.dbcp.BasicDataSource;
+
import com.typesafe.config.Config;
+
import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.metastore.MysqlStateStore;
-import java.util.Properties;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
@Alias("mysql")
public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory {
@Override
public DatasetStateStore<JobState.DatasetState> createStateStore(Config config) {
- BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
String stateStoreTableName = config.hasPath(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) ?
config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) :
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE;
@@ -37,6 +38,9 @@ public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory
ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES;
try {
+ BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
return new MysqlDatasetStateStore(basicDataSource, stateStoreTableName, compressedValues);
} catch (Exception e) {
throw new RuntimeException("Failed to create MysqlDatasetStateStore with factory", e);