You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/24 18:31:58 UTC

git commit: SQOOP-971: Sqoop2: Component reconfigurability

Updated Branches:
  refs/heads/sqoop2 156facc49 -> d62567ddf


SQOOP-971: Sqoop2: Component reconfigurability

(Mengwei Ding via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d62567dd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d62567dd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d62567dd

Branch: refs/heads/sqoop2
Commit: d62567ddf3b553956a3cb2a999ef47abdbc8eeb3
Parents: 156facc
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jun 24 09:31:22 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jun 24 09:31:22 2013 -0700

----------------------------------------------------------------------
 .../sqoop/connector/ConnectorManager.java       | 16 +++-
 .../java/org/apache/sqoop/core/CoreError.java   |  3 +
 .../org/apache/sqoop/core/Reconfigurable.java   | 29 ++++++
 .../apache/sqoop/core/SqoopConfiguration.java   | 44 +++++++--
 .../sqoop/framework/FrameworkManager.java       | 14 ++-
 .../org/apache/sqoop/framework/JobManager.java  | 57 ++++++++++-
 .../repository/JdbcRepositoryProvider.java      | 99 ++++++++++++++++++++
 .../sqoop/repository/RepositoryManager.java     | 31 +++++-
 .../sqoop/repository/RepositoryProvider.java    |  3 +-
 9 files changed, 281 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index 500189a..0540f6b 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -33,12 +33,15 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
 import org.apache.sqoop.repository.Repository;
 import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.repository.RepositoryTransaction;
 import org.apache.sqoop.model.MConnector;
 
-public class ConnectorManager {
+public class ConnectorManager implements Reconfigurable {
 
   /**
    * Logger object.
@@ -184,6 +187,8 @@ public class ConnectorManager {
 
     registerConnectors();
 
+    SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
     if (LOG.isInfoEnabled()) {
       LOG.info("Connectors loaded: " + handlerMap);
     }
@@ -231,4 +236,13 @@ public class ConnectorManager {
       handlerMap = null;
       nameMap = null;
   }
+
+  @Override
+  public synchronized void configurationChanged() {
+    LOG.info("Begin connector manager reconfiguring");
+    // If there are configuration options for ConnectorManager,
+    // implement the reconfiguration procedure right here.
+    LOG.info("Connector manager reconfigured");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/CoreError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java
index f59d132..eb7c1dc 100644
--- a/core/src/main/java/org/apache/sqoop/core/CoreError.java
+++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java
@@ -51,6 +51,9 @@ public enum CoreError implements ErrorCode {
   /** The configuration system has not been initialized correctly. */
   CORE_0007("System not initialized"),
 
+  /** The system has not been reconfigured */
+  CORE_0008("System not reconfigured");
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
new file mode 100644
index 0000000..d25ce41
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.core;
+
+/**
+ * Interface that make Sqoop Server components sensitive to
+ * configuration file changes at the runtime
+ */
+public interface Reconfigurable {
+  /**
+   * Method to notify each reconfigurable components
+   */
+  public void configurationChanged();
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
index deb24c9..13bbfc2 100644
--- a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -33,7 +32,7 @@ import org.apache.sqoop.common.SqoopException;
 /**
  * Configuration manager that loads Sqoop configuration.
  */
-public class SqoopConfiguration {
+public class SqoopConfiguration implements Reconfigurable {
 
   /**
    * Logger object.
@@ -79,6 +78,7 @@ public class SqoopConfiguration {
   private boolean initialized = false;
   private ConfigurationProvider provider = null;
   private Map<String, String> config = null;
+  private Map<String, String> oldConfig = null;
 
   public synchronized void initialize() {
     if (initialized) {
@@ -165,8 +165,9 @@ public class SqoopConfiguration {
 
     // Initialize the configuration provider
     provider.initialize(configDir, bootstrapProperties);
-    refreshConfiguration();
-    provider.registerListener(new CoreConfigurationListener());
+    configurationChanged();
+
+    provider.registerListener(new CoreConfigurationListener(SqoopConfiguration.getInstance()));
 
     initialized = true;
   }
@@ -176,10 +177,19 @@ public class SqoopConfiguration {
       throw new SqoopException(CoreError.CORE_0007);
     }
 
-    Map<String,String> parameters = new HashMap<String, String>();
-    parameters.putAll(config);
+    return new MapContext(config);
+  }
+
+  public synchronized MapContext getOldContext() {
+    if (!initialized) {
+      throw new SqoopException(CoreError.CORE_0007);
+    }
+
+    if (oldConfig == null) {
+      throw new SqoopException(CoreError.CORE_0008);
+    }
 
-    return new MapContext(parameters);
+    return new MapContext(oldConfig);
   }
 
   public synchronized void destroy() {
@@ -193,6 +203,7 @@ public class SqoopConfiguration {
     provider = null;
     configDir = null;
     config = null;
+    oldConfig = null;
     initialized = false;
   }
 
@@ -209,15 +220,28 @@ public class SqoopConfiguration {
     PropertyConfigurator.configure(props);
   }
 
-  private synchronized void refreshConfiguration() {
+  public ConfigurationProvider getProvider() {
+    return provider;
+  }
+
+  @Override
+  public synchronized void configurationChanged() {
+    oldConfig = config;
     config = provider.getConfiguration();
     configureLogging();
   }
 
-  public class CoreConfigurationListener implements ConfigurationListener {
+  public static class CoreConfigurationListener implements ConfigurationListener {
+
+    private Reconfigurable listener;
+
+    public CoreConfigurationListener(Reconfigurable target) {
+      listener = target;
+    }
+
     @Override
     public void configurationChanged() {
-      refreshConfiguration();
+      listener.configurationChanged();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 704b809..a81306b 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -19,6 +19,9 @@ package org.apache.sqoop.framework;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
@@ -45,7 +48,7 @@ import java.util.ResourceBundle;
  * be the fastest way and we might want to introduce internal structures for
  * running jobs in case that this approach will be too slow.
  */
-public class FrameworkManager {
+public class FrameworkManager implements Reconfigurable {
 
   /**
    * Logger object.
@@ -141,6 +144,8 @@ public class FrameworkManager {
     // Register framework metadata in repository
     mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework);
 
+    SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
     LOG.info("Submission manager initialized: OK");
   }
 
@@ -165,4 +170,11 @@ public class FrameworkManager {
         FrameworkConstants.RESOURCE_BUNDLE_NAME, locale);
   }
 
+  @Override
+  public void configurationChanged() {
+    LOG.info("Begin framework manager reconfiguring");
+    // If there are configuration options for FrameworkManager,
+    // implement the reconfiguration procedure right here.
+    LOG.info("Framework manager reconfigured");
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 6d22c62..5a2f490 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -22,7 +22,9 @@ import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.*;
@@ -40,7 +42,7 @@ import org.json.simple.JSONValue;
 import java.util.Date;
 import java.util.List;
 
-public class JobManager {
+public class JobManager implements Reconfigurable {
    /**
     * Logger object.
     */
@@ -248,6 +250,8 @@ public class JobManager {
        updateThread = new UpdateThread();
        updateThread.start();
 
+       SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
        LOG.info("Submission manager initialized: OK");
    }
    public MSubmission submit(long jobId) {
@@ -495,6 +499,57 @@ public class JobManager {
        RepositoryManager.getInstance().getRepository().updateSubmission(submission);
    }
 
+   @Override
+   public synchronized void configurationChanged() {
+     LOG.info("Begin submission engine manager reconfiguring");
+     MapContext newContext = SqoopConfiguration.getInstance().getContext();
+     MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+     String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+     if (newSubmissionEngineClassName == null
+         || newSubmissionEngineClassName.trim().length() == 0) {
+       throw new SqoopException(FrameworkError.FRAMEWORK_0001,
+           newSubmissionEngineClassName);
+     }
+
+     String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+     if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
+       LOG.warn("Submission engine cannot be replaced at the runtime. " +
+                "You might need to restart the server.");
+     }
+
+     String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+     if (newExecutionEngineClassName == null
+         || newExecutionEngineClassName.trim().length() == 0) {
+       throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+           newExecutionEngineClassName);
+     }
+
+     String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+     if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
+       LOG.warn("Execution engine cannot be replaced at the runtime. " +
+                "You might need to restart the server.");
+     }
+
+     // Set up worker threads
+     purgeThreshold = newContext.getLong(
+       FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+       DEFAULT_PURGE_THRESHOLD
+     );
+     purgeSleep = newContext.getLong(
+       FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+       DEFAULT_PURGE_SLEEP
+     );
+     purgeThread.interrupt();
+
+     updateSleep = newContext.getLong(
+       FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+       DEFAULT_UPDATE_SLEEP
+     );
+     updateThread.interrupt();
+
+     LOG.info("Submission engine manager reconfigured.");
+   }
 
    private class PurgeThread extends Thread {
        public PurgeThread() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
index 1fd092a..011527f 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java
@@ -164,4 +164,103 @@ public class JdbcRepositoryProvider implements RepositoryProvider {
   public synchronized Repository getRepository() {
     return repository;
   }
+
+  @Override
+  public void configurationChanged() {
+    LOG.info("Begin JdbcRepository reconfiguring.");
+    JdbcRepositoryContext oldRepoContext = repoContext;
+    repoContext = new JdbcRepositoryContext(SqoopConfiguration.getInstance().getContext());
+
+    // reconfigure jdbc handler
+    String newJdbcHandlerClassName = repoContext.getHandlerClassName();
+    if (newJdbcHandlerClassName == null
+        || newJdbcHandlerClassName.trim().length() == 0) {
+      throw new SqoopException(RepositoryError.JDBCREPO_0001,
+          newJdbcHandlerClassName);
+    }
+
+    String oldJdbcHandlerClassName = oldRepoContext.getHandlerClassName();
+    if (!newJdbcHandlerClassName.equals(oldJdbcHandlerClassName)) {
+      LOG.warn("Repository JDBC handler cannot be replaced at the runtime. " +
+               "You might need to restart the server.");
+    }
+
+    // reconfigure jdbc driver
+    String newJdbcDriverClassName = repoContext.getDriverClass();
+    if (newJdbcDriverClassName == null
+        || newJdbcDriverClassName.trim().length() == 0) {
+      throw new SqoopException(RepositoryError.JDBCREPO_0003,
+              newJdbcDriverClassName);
+    }
+
+    String oldJdbcDriverClassName = oldRepoContext.getDriverClass();
+    if (!newJdbcDriverClassName.equals(oldJdbcDriverClassName)) {
+      LOG.warn("Repository JDBC driver cannot be replaced at the runtime. " +
+               "You might need to restart the server.");
+    }
+
+    // reconfigure max connection
+    connectionPool.setMaxActive(repoContext.getMaximumConnections());
+
+    // reconfigure the url of repository
+    String connectUrl = repoContext.getConnectionUrl();
+    String oldurl = oldRepoContext.getConnectionUrl();
+    if (connectUrl != null && !connectUrl.equals(oldurl)) {
+      LOG.warn("Repository URL cannot be replaced at the runtime. " +
+               "You might need to restart the server.");
+    }
+
+    // if connection properties or transaction isolation option changes
+    boolean connFactoryChanged = false;
+
+    // compare connection properties
+    if (!connFactoryChanged) {
+      Properties oldProp = oldRepoContext.getConnectionProperties();
+      Properties newProp = repoContext.getConnectionProperties();
+
+      if (newProp.size() != oldProp.size()) {
+        connFactoryChanged = true;
+      } else {
+        for (Object key : newProp.keySet()) {
+          if (!newProp.getProperty((String) key).equals(oldProp.getProperty((String) key))) {
+            connFactoryChanged = true;
+            break;
+          }
+        }
+      }
+    }
+
+    // compare the transaction isolation option
+    if (!connFactoryChanged) {
+      String oldTxOption = oldRepoContext.getTransactionIsolation().toString();
+      String newTxOption = repoContext.getTransactionIsolation().toString();
+
+      if (!newTxOption.equals(oldTxOption)) {
+        connFactoryChanged = true;
+      }
+    }
+
+    if (connFactoryChanged) {
+      // try to reconfigure connection factory
+      try {
+        LOG.info("Reconfiguring Connection Factory.");
+        Properties jdbcProps = repoContext.getConnectionProperties();
+
+        ConnectionFactory connFactory =
+            new DriverManagerConnectionFactory(connectUrl, jdbcProps);
+
+        new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
+                handler.validationQuery(), false, false,
+                repoContext.getTransactionIsolation().getCode());
+      } catch (IllegalStateException ex) {
+        // failed to reconfigure connection factory
+        LOG.warn("Repository connection cannot be reconfigured currently. " +
+                 "You might need to restart the server.");
+      }
+    }
+
+    // ignore the create schema option, because the repo url is not allowed to change
+
+    LOG.info("JdbcRepository reconfigured.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
index a178238..d77a39b 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
@@ -22,10 +22,12 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
 import org.apache.sqoop.utils.ClassUtils;
 
-public class RepositoryManager {
+public class RepositoryManager implements Reconfigurable {
 
   /**
    * Logger object.
@@ -120,6 +122,8 @@ public class RepositoryManager {
       throw new SqoopException(RepositoryError.REPO_0002);
     }
 
+    SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
+
     LOG.info("Repository initialized: OK");
   }
 
@@ -134,4 +138,29 @@ public class RepositoryManager {
   public synchronized Repository getRepository() {
     return provider.getRepository();
   }
+
+  @Override
+  public synchronized void configurationChanged() {
+    LOG.info("Begin repository manager reconfiguring");
+    MapContext newContext = SqoopConfiguration.getInstance().getContext();
+    MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+    String newProviderClassName = newContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+    if (newProviderClassName == null
+        || newProviderClassName.trim().length() == 0) {
+      throw new SqoopException(RepositoryError.REPO_0001,
+          RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+    }
+
+    String oldProviderClassName = oldContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
+    if (!newProviderClassName.equals(oldProviderClassName)) {
+      LOG.warn("Repository provider cannot be replaced at the runtime. " +
+               "You might need to restart the server.");
+    }
+
+    provider.configurationChanged();
+
+    LOG.info("Repository manager reconfigured.");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
index 4ea52e9..1ec6bdf 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java
@@ -18,8 +18,9 @@
 package org.apache.sqoop.repository;
 
 import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.core.Reconfigurable;
 
-public interface RepositoryProvider {
+public interface RepositoryProvider extends Reconfigurable {
 
   void initialize(MapContext context);