You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/24 18:31:58 UTC
git commit: SQOOP-971: Sqoop2: Component reconfigurability
Updated Branches:
refs/heads/sqoop2 156facc49 -> d62567ddf
SQOOP-971: Sqoop2: Component reconfigurability
(Mengwei Ding via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d62567dd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d62567dd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d62567dd
Branch: refs/heads/sqoop2
Commit: d62567ddf3b553956a3cb2a999ef47abdbc8eeb3
Parents: 156facc
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jun 24 09:31:22 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jun 24 09:31:22 2013 -0700
----------------------------------------------------------------------
.../sqoop/connector/ConnectorManager.java | 16 +++-
.../java/org/apache/sqoop/core/CoreError.java | 3 +
.../org/apache/sqoop/core/Reconfigurable.java | 29 ++++++
.../apache/sqoop/core/SqoopConfiguration.java | 44 +++++++--
.../sqoop/framework/FrameworkManager.java | 14 ++-
.../org/apache/sqoop/framework/JobManager.java | 57 ++++++++++-
.../repository/JdbcRepositoryProvider.java | 99 ++++++++++++++++++++
.../sqoop/repository/RepositoryManager.java | 31 +++++-
.../sqoop/repository/RepositoryProvider.java | 3 +-
9 files changed, 281 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index 500189a..0540f6b 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -33,12 +33,15 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.repository.RepositoryTransaction;
import org.apache.sqoop.model.MConnector;
-public class ConnectorManager {
+public class ConnectorManager implements Reconfigurable {
/**
* Logger object.
@@ -184,6 +187,8 @@ public class ConnectorManager {
registerConnectors();
+ SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
if (LOG.isInfoEnabled()) {
LOG.info("Connectors loaded: " + handlerMap);
}
@@ -231,4 +236,13 @@ public class ConnectorManager {
handlerMap = null;
nameMap = null;
}
+
+ @Override
+ public synchronized void configurationChanged() {
+ LOG.info("Begin connector manager reconfiguring");
+ // If there are configuration options for ConnectorManager,
+ // implement the reconfiguration procedure right here.
+ LOG.info("Connector manager reconfigured");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/CoreError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java
index f59d132..eb7c1dc 100644
--- a/core/src/main/java/org/apache/sqoop/core/CoreError.java
+++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java
@@ -51,6 +51,9 @@ public enum CoreError implements ErrorCode {
/** The configuration system has not been initialized correctly. */
CORE_0007("System not initialized"),
+ /** The system has not been reconfigured */
+ CORE_0008("System not reconfigured");
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
new file mode 100644
index 0000000..d25ce41
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.core;
+
+/**
+ * Interface that make Sqoop Server components sensitive to
+ * configuration file changes at the runtime
+ */
+public interface Reconfigurable {
+ /**
+ * Method to notify each reconfigurable components
+ */
+ public void configurationChanged();
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
index deb24c9..13bbfc2 100644
--- a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -33,7 +32,7 @@ import org.apache.sqoop.common.SqoopException;
/**
* Configuration manager that loads Sqoop configuration.
*/
-public class SqoopConfiguration {
+public class SqoopConfiguration implements Reconfigurable {
/**
* Logger object.
@@ -79,6 +78,7 @@ public class SqoopConfiguration {
private boolean initialized = false;
private ConfigurationProvider provider = null;
private Map<String, String> config = null;
+ private Map<String, String> oldConfig = null;
public synchronized void initialize() {
if (initialized) {
@@ -165,8 +165,9 @@ public class SqoopConfiguration {
// Initialize the configuration provider
provider.initialize(configDir, bootstrapProperties);
- refreshConfiguration();
- provider.registerListener(new CoreConfigurationListener());
+ configurationChanged();
+
+ provider.registerListener(new CoreConfigurationListener(SqoopConfiguration.getInstance()));
initialized = true;
}
@@ -176,10 +177,19 @@ public class SqoopConfiguration {
throw new SqoopException(CoreError.CORE_0007);
}
- Map<String,String> parameters = new HashMap<String, String>();
- parameters.putAll(config);
+ return new MapContext(config);
+ }
+
+ public synchronized MapContext getOldContext() {
+ if (!initialized) {
+ throw new SqoopException(CoreError.CORE_0007);
+ }
+
+ if (oldConfig == null) {
+ throw new SqoopException(CoreError.CORE_0008);
+ }
- return new MapContext(parameters);
+ return new MapContext(oldConfig);
}
public synchronized void destroy() {
@@ -193,6 +203,7 @@ public class SqoopConfiguration {
provider = null;
configDir = null;
config = null;
+ oldConfig = null;
initialized = false;
}
@@ -209,15 +220,28 @@ public class SqoopConfiguration {
PropertyConfigurator.configure(props);
}
- private synchronized void refreshConfiguration() {
+ public ConfigurationProvider getProvider() {
+ return provider;
+ }
+
+ @Override
+ public synchronized void configurationChanged() {
+ oldConfig = config;
config = provider.getConfiguration();
configureLogging();
}
- public class CoreConfigurationListener implements ConfigurationListener {
+ public static class CoreConfigurationListener implements ConfigurationListener {
+
+ private Reconfigurable listener;
+
+ public CoreConfigurationListener(Reconfigurable target) {
+ listener = target;
+ }
+
@Override
public void configurationChanged() {
- refreshConfiguration();
+ listener.configurationChanged();
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 704b809..a81306b 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -19,6 +19,9 @@ package org.apache.sqoop.framework;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
@@ -45,7 +48,7 @@ import java.util.ResourceBundle;
* be the fastest way and we might want to introduce internal structures for
* running jobs in case that this approach will be too slow.
*/
-public class FrameworkManager {
+public class FrameworkManager implements Reconfigurable {
/**
* Logger object.
@@ -141,6 +144,8 @@ public class FrameworkManager {
// Register framework metadata in repository
mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework);
+ SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
LOG.info("Submission manager initialized: OK");
}
@@ -165,4 +170,11 @@ public class FrameworkManager {
FrameworkConstants.RESOURCE_BUNDLE_NAME, locale);
}
+ @Override
+ public void configurationChanged() {
+ LOG.info("Begin framework manager reconfiguring");
+ // If there are configuration options for FrameworkManager,
+ // implement the reconfiguration procedure right here.
+ LOG.info("Framework manager reconfigured");
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 6d22c62..5a2f490 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -22,7 +22,9 @@ import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.*;
@@ -40,7 +42,7 @@ import org.json.simple.JSONValue;
import java.util.Date;
import java.util.List;
-public class JobManager {
+public class JobManager implements Reconfigurable {
/**
* Logger object.
*/
@@ -248,6 +250,8 @@ public class JobManager {
updateThread = new UpdateThread();
updateThread.start();
+ SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
LOG.info("Submission manager initialized: OK");
}
public MSubmission submit(long jobId) {
@@ -495,6 +499,57 @@ public class JobManager {
RepositoryManager.getInstance().getRepository().updateSubmission(submission);
}
+ @Override
+ public synchronized void configurationChanged() {
+ LOG.info("Begin submission engine manager reconfiguring");
+ MapContext newContext = SqoopConfiguration.getInstance().getContext();
+ MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+ String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+ if (newSubmissionEngineClassName == null
+ || newSubmissionEngineClassName.trim().length() == 0) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0001,
+ newSubmissionEngineClassName);
+ }
+
+ String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+ if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
+ LOG.warn("Submission engine cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+ if (newExecutionEngineClassName == null
+ || newExecutionEngineClassName.trim().length() == 0) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+ newExecutionEngineClassName);
+ }
+
+ String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+ if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
+ LOG.warn("Execution engine cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ // Set up worker threads
+ purgeThreshold = newContext.getLong(
+ FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+ DEFAULT_PURGE_THRESHOLD
+ );
+ purgeSleep = newContext.getLong(
+ FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+ DEFAULT_PURGE_SLEEP
+ );
+ purgeThread.interrupt();
+
+ updateSleep = newContext.getLong(
+ FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+ DEFAULT_UPDATE_SLEEP
+ );
+ updateThread.interrupt();
+
+ LOG.info("Submission engine manager reconfigured.");
+ }
private class PurgeThread extends Thread {
public PurgeThread() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
index 1fd092a..011527f 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
@@ -164,4 +164,103 @@ public class JdbcRepositoryProvider implements RepositoryProvider {
public synchronized Repository getRepository() {
return repository;
}
+
+ @Override
+ public void configurationChanged() {
+ LOG.info("Begin JdbcRepository reconfiguring.");
+ JdbcRepositoryContext oldRepoContext = repoContext;
+ repoContext = new JdbcRepositoryContext(SqoopConfiguration.getInstance().getContext());
+
+ // reconfigure jdbc handler
+ String newJdbcHandlerClassName = repoContext.getHandlerClassName();
+ if (newJdbcHandlerClassName == null
+ || newJdbcHandlerClassName.trim().length() == 0) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0001,
+ newJdbcHandlerClassName);
+ }
+
+ String oldJdbcHandlerClassName = oldRepoContext.getHandlerClassName();
+ if (!newJdbcHandlerClassName.equals(oldJdbcHandlerClassName)) {
+ LOG.warn("Repository JDBC handler cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ // reconfigure jdbc driver
+ String newJdbcDriverClassName = repoContext.getDriverClass();
+ if (newJdbcDriverClassName == null
+ || newJdbcDriverClassName.trim().length() == 0) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0003,
+ newJdbcDriverClassName);
+ }
+
+ String oldJdbcDriverClassName = oldRepoContext.getDriverClass();
+ if (!newJdbcDriverClassName.equals(oldJdbcDriverClassName)) {
+ LOG.warn("Repository JDBC driver cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ // reconfigure max connection
+ connectionPool.setMaxActive(repoContext.getMaximumConnections());
+
+ // reconfigure the url of repository
+ String connectUrl = repoContext.getConnectionUrl();
+ String oldurl = oldRepoContext.getConnectionUrl();
+ if (connectUrl != null && !connectUrl.equals(oldurl)) {
+ LOG.warn("Repository URL cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ // if connection properties or transaction isolation option changes
+ boolean connFactoryChanged = false;
+
+ // compare connection properties
+ if (!connFactoryChanged) {
+ Properties oldProp = oldRepoContext.getConnectionProperties();
+ Properties newProp = repoContext.getConnectionProperties();
+
+ if (newProp.size() != oldProp.size()) {
+ connFactoryChanged = true;
+ } else {
+ for (Object key : newProp.keySet()) {
+ if (!newProp.getProperty((String) key).equals(oldProp.getProperty((String) key))) {
+ connFactoryChanged = true;
+ break;
+ }
+ }
+ }
+ }
+
+ // compare the transaction isolation option
+ if (!connFactoryChanged) {
+ String oldTxOption = oldRepoContext.getTransactionIsolation().toString();
+ String newTxOption = repoContext.getTransactionIsolation().toString();
+
+ if (!newTxOption.equals(oldTxOption)) {
+ connFactoryChanged = true;
+ }
+ }
+
+ if (connFactoryChanged) {
+ // try to reconfigure connection factory
+ try {
+ LOG.info("Reconfiguring Connection Factory.");
+ Properties jdbcProps = repoContext.getConnectionProperties();
+
+ ConnectionFactory connFactory =
+ new DriverManagerConnectionFactory(connectUrl, jdbcProps);
+
+ new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
+ handler.validationQuery(), false, false,
+ repoContext.getTransactionIsolation().getCode());
+ } catch (IllegalStateException ex) {
+ // failed to reconfigure connection factory
+ LOG.warn("Repository connection cannot be reconfigured currently. " +
+ "You might need to restart the server.");
+ }
+ }
+
+ // ignore the create schema option, because the repo url is not allowed to change
+
+ LOG.info("JdbcRepository reconfigured.");
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
index a178238..d77a39b 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
@@ -22,10 +22,12 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.utils.ClassUtils;
-public class RepositoryManager {
+public class RepositoryManager implements Reconfigurable {
/**
* Logger object.
@@ -120,6 +122,8 @@ public class RepositoryManager {
throw new SqoopException(RepositoryError.REPO_0002);
}
+ SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
LOG.info("Repository initialized: OK");
}
@@ -134,4 +138,29 @@ public class RepositoryManager {
public synchronized Repository getRepository() {
return provider.getRepository();
}
+
+ @Override
+ public synchronized void configurationChanged() {
+ LOG.info("Begin repository manager reconfiguring");
+ MapContext newContext = SqoopConfiguration.getInstance().getContext();
+ MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+ String newProviderClassName = newContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+ if (newProviderClassName == null
+ || newProviderClassName.trim().length() == 0) {
+ throw new SqoopException(RepositoryError.REPO_0001,
+ RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+ }
+
+ String oldProviderClassName = oldContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+ if (!newProviderClassName.equals(oldProviderClassName)) {
+ LOG.warn("Repository provider cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ provider.configurationChanged();
+
+ LOG.info("Repository manager reconfigured.");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
index 4ea52e9..1ec6bdf 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
@@ -18,8 +18,9 @@
package org.apache.sqoop.repository;
import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.core.Reconfigurable;
-public interface RepositoryProvider {
+public interface RepositoryProvider extends Reconfigurable {
void initialize(MapContext context);