You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 00:57:33 UTC

incubator-gobblin git commit: [GOBBLIN-432] Share the DataSource used by the MySQL state stores

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1c5fb6ec4 -> 60adccfd9


[GOBBLIN-432] Share the DataSource used by the MySQL state stores

Closes #2311 from
htran1/mysql_state_store_share_data_source


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60adccfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60adccfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60adccfd

Branch: refs/heads/master
Commit: 60adccfd96a33d19cfac8af6e26dd837b8156f89
Parents: 1c5fb6e
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Mar 21 17:57:27 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Mar 21 17:57:27 2018 -0700

----------------------------------------------------------------------
 .../metastore/MysqlDataSourceFactory.java       | 85 ++++++++++++++++++
 .../gobblin/metastore/MysqlDataSourceKey.java   | 71 +++++++++++++++
 .../gobblin/metastore/MysqlStateStore.java      | 14 +++
 .../metastore/MysqlStateStoreFactory.java       | 10 ++-
 .../metastore/MysqlDataSourceFactoryTest.java   | 93 ++++++++++++++++++++
 .../runtime/MysqlDatasetStateStoreFactory.java  | 12 ++-
 6 files changed, 278 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
new file mode 100644
index 0000000..45bd045
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A {@link SharedResourceFactory} for creating {@link BasicDataSource}s.
+ *
+ * The factory creates a {@link BasicDataSource} with the config.
+ */
+@Slf4j
+public class MysqlDataSourceFactory<S extends ScopeType<S>>
+    implements SharedResourceFactory<BasicDataSource, MysqlDataSourceKey, S> {
+
+  public static final String FACTORY_NAME = "basicDataSource";
+
+  /**
+   * Get a {@link BasicDataSource} based on the config
+   * @param config configuration
+   * @param broker broker
+   * @return a {@link BasicDataSource}
+   * @throws IOException
+   */
+  public static <S extends ScopeType<S>> BasicDataSource get(Config config,
+      SharedResourcesBroker<S> broker) throws IOException {
+    try {
+      return broker.getSharedResource(new MysqlDataSourceFactory<S>(),
+          new MysqlDataSourceKey(MysqlStateStore.getDataSourceId(config), config));
+    } catch (NotConfiguredException nce) {
+      throw new IOException(nce);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<BasicDataSource> createResource(SharedResourcesBroker<S> broker,
+    ScopedConfigView<S, MysqlDataSourceKey> config) throws NotConfiguredException {
+    MysqlDataSourceKey key = config.getKey();
+    Config configuration = key.getConfig();
+
+    BasicDataSource dataSource = MysqlStateStore.newDataSource(configuration);
+
+    return new ResourceInstance<>(dataSource);
+  }
+
+  @Override
+  public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, MysqlDataSourceKey> config) {
+    return broker.selfScope().getType().rootScope();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
new file mode 100644
index 0000000..bccab28
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+
+import lombok.Getter;
+
+/**
+ * {@link SharedResourceKey} for requesting {@link BasicDataSource}s from a
+ * {@link org.apache.gobblin.broker.iface.SharedResourceFactory}
+ */
+@Getter
+public class MysqlDataSourceKey implements SharedResourceKey {
+  private final String dataSourceName;
+  private final Config config;
+
+  /**
+   * Constructs a key for the mysql data source. The dataSourceName is used as the key.
+   * @param dataSourceName an identifier for the data source
+   * @param config configuration that is passed along to configure the data source
+   */
+  public MysqlDataSourceKey(String dataSourceName, Config config) {
+    this.dataSourceName = dataSourceName;
+    this.config = config;
+  }
+
+  @Override
+  public String toConfigurationKey() {
+    return this.dataSourceName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MysqlDataSourceKey that = (MysqlDataSourceKey) o;
+
+    return dataSourceName == null ?
+        that.dataSourceName == null : dataSourceName.equals(that.dataSourceName);
+  }
+
+  @Override
+  public int hashCode() {
+    return dataSourceName != null ? dataSourceName.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index b276702..d5ae6ff 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -176,6 +176,20 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     return basicDataSource;
   }
 
+  /**
+   * return an identifier for the data source based on the configuration
+   * @param config configuration
+   * @return a {@link String} to identify the data source
+   */
+  public static String getDataSourceId(Config config) {
+    PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));
+
+    return ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER) + "::"
+        + config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY) + "::"
+        + passwordManager.readPassword(config.getString(ConfigurationKeys.STATE_STORE_DB_USER_KEY));
+  }
+
   @Override
   public boolean create(String storeName) throws IOException {
     /* nothing to do since state will be stored as a new row in a DB table that has been validated */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
index aad441a..4af3730 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
@@ -16,25 +16,29 @@
  */
 package org.apache.gobblin.metastore;
 
+import org.apache.commons.dbcp.BasicDataSource;
+
 import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ConfigUtils;
-import java.util.Properties;
-import org.apache.commons.dbcp.BasicDataSource;
 
 @Alias("mysql")
 public class MysqlStateStoreFactory implements StateStore.Factory {
   @Override
   public <T extends State> StateStore<T> createStateStore(Config config, Class<T> stateClass) {
-    BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
     String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
     boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
             ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
 
     try {
+      BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+
       return new MysqlStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass);
     } catch (Exception e) {
       throw new RuntimeException("Failed to create MysqlStateStore with factory", e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
new file mode 100644
index 0000000..a11a4af
--- /dev/null
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+/**
+ * Unit tests for {@link MysqlDataSourceFactory}.
+ */
+@Test(groups = { "gobblin.metastore" })
+public class MysqlDataSourceFactoryTest {
+
+  @Test
+  public void testSameKey() throws IOException {
+
+    Config config = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url",
+        ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+        ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+    BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    Assert.assertEquals(basicDataSource1, basicDataSource2);
+  }
+
+  @Test
+  public void testDifferentKey() throws IOException {
+
+    Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+        ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+        ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+    Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url2",
+        ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user",
+        ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+    BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    Assert.assertNotEquals(basicDataSource1, basicDataSource2);
+  }
+
+  @Test
+  public void testSameDbDifferentUser() throws IOException {
+
+    Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+        ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user1",
+        ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+    Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1",
+        ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user2",
+        ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd"));
+
+    BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+
+    Assert.assertNotEquals(basicDataSource1, basicDataSource2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
index fea2a62..8d5b291 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java
@@ -16,19 +16,20 @@
  */
 package org.apache.gobblin.runtime;
 
+import org.apache.commons.dbcp.BasicDataSource;
+
 import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.metastore.MysqlStateStore;
-import java.util.Properties;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 
 @Alias("mysql")
 public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory {
   @Override
   public DatasetStateStore<JobState.DatasetState> createStateStore(Config config) {
-    BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
     String stateStoreTableName = config.hasPath(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) ?
         config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) :
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE;
@@ -37,6 +38,9 @@ public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory
         ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES;
 
     try {
+      BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+
       return new MysqlDatasetStateStore(basicDataSource, stateStoreTableName, compressedValues);
     } catch (Exception e) {
       throw new RuntimeException("Failed to create MysqlDatasetStateStore with factory", e);