You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/21 15:40:01 UTC
[2/2] git commit: SQOOP-1551: Repository Upgrader api - Extensibility
SQOOP-1551: Repository Upgrader api - Extensibility
(Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/39a22000
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/39a22000
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/39a22000
Branch: refs/heads/sqoop2
Commit: 39a22000079412d6181d43f40ae3cb20afc624c5
Parents: 3257b38
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 21 06:39:32 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 21 06:39:32 2014 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/model/Configurable.java | 25 +++
.../apache/sqoop/model/MConfigurableType.java | 30 +++
.../java/org/apache/sqoop/model/MConnector.java | 4 +-
.../java/org/apache/sqoop/model/MDriver.java | 2 +-
.../main/java/org/apache/sqoop/model/MJob.java | 8 +
.../connector/jdbc/GenericJdbcConnector.java | 4 +-
.../jdbc/GenericJdbcConnectorUpgrader.java | 63 ++-----
.../connector/hdfs/HdfsConfigUpgrader.java | 81 ---------
.../sqoop/connector/hdfs/HdfsConnector.java | 8 +-
.../connector/hdfs/HdfsConnectorUpgrader.java | 45 +++++
.../sqoop/connector/ConnectorHandler.java | 32 ++--
.../sqoop/connector/ConnectorManager.java | 33 ++--
.../java/org/apache/sqoop/driver/Driver.java | 18 +-
.../sqoop/driver/DriverConfigUpgrader.java | 77 --------
.../sqoop/driver/DriverConfigValidator.java | 6 +-
.../org/apache/sqoop/driver/DriverUpgrader.java | 29 +++
.../org/apache/sqoop/driver/JobManager.java | 14 +-
.../configuration/DriverConfiguration.java | 34 ----
.../driver/configuration/JobConfiguration.java | 34 ++++
.../apache/sqoop/repository/JdbcRepository.java | 12 +-
.../sqoop/repository/JdbcRepositoryHandler.java | 4 +-
.../org/apache/sqoop/repository/Repository.java | 181 ++++++++++---------
.../sqoop/driver/TestDriverConfigUpgrader.java | 62 ++++---
.../org/apache/sqoop/driver/TestJobManager.java | 6 +-
.../sqoop/repository/TestJdbcRepository.java | 128 +++++++------
.../derby/DerbyRepositoryHandler.java | 40 ++--
.../repository/derby/TestDriverHandling.java | 2 +-
.../sqoop/handler/ConnectorRequestHandler.java | 4 +-
.../apache/sqoop/handler/JobRequestHandler.java | 10 +-
.../sqoop/handler/LinkRequestHandler.java | 4 +-
.../configurable/ConfigurableUpgradeUtil.java | 62 +++++++
.../sqoop/connector/ConfigurableError.java | 43 +++++
.../spi/ConnectorConfigurableUpgrader.java | 84 +++++++++
.../sqoop/connector/spi/RepositoryUpgrader.java | 51 ------
.../sqoop/connector/spi/SqoopConnector.java | 6 +-
.../sqoop/tools/tool/RepositoryDumpTool.java | 2 +-
.../sqoop/tools/tool/RepositoryLoadTool.java | 71 ++++----
37 files changed, 713 insertions(+), 606 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/Configurable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/Configurable.java b/common/src/main/java/org/apache/sqoop/model/Configurable.java
new file mode 100644
index 0000000..2033fcb
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/model/Configurable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.model;
+
+/**
+ * Marker class that identifies the Configurables in the Sqoop system
+ */
+public abstract class Configurable extends MPersistableEntity implements MClonable {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
new file mode 100644
index 0000000..7ab7032
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.model;
+
+/**
+ * Represents the sqoop entities that can own configs
+ */
+public enum MConfigurableType {
+
+ /** Connector as a owner of config keys */
+ CONNECTOR,
+
+ /** Driver as a owner of config keys */
+ DRIVER;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 2f42191..174d0b9 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -23,11 +23,11 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.SupportedDirections;
/**
- * Connector entity supports the FROM/TO {@link Transferable} Includes unique id
+ * Connector entity supports the FROM/TO {@link org.apache.sqoop.job.etl.Transfereable} Includes unique id
* that identifies connector in the repository, unique human readable name,
* corresponding name and all configs to support the from and to data sources
*/
-public final class MConnector extends MPersistableEntity implements MClonable {
+public final class MConnector extends Configurable {
private final String uniqueName;
private final String className;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MDriver.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java
index 685439e..4241a31 100644
--- a/common/src/main/java/org/apache/sqoop/model/MDriver.java
+++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java
@@ -22,7 +22,7 @@ import java.sql.Driver;
/**
* Describes the configs associated with the {@link Driver} for executing sqoop jobs.
*/
-public class MDriver extends MPersistableEntity implements MClonable {
+public final class MDriver extends Configurable {
private final MDriverConfig driverConfig;
private final String version;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index b3dec27..935dd18 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -139,6 +139,14 @@ public class MJob extends MAccountableEntity implements MClonable {
}
}
+ public long getFromConnectorId() {
+ return fromConnectorId;
+ }
+
+ public long getToConnectorId() {
+ return toConnectorId;
+ }
+
public MConfigList getJobConfig(Direction type) {
switch(type) {
case FROM:
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index 87ac2af..8469064 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -25,7 +25,7 @@ import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -97,7 +97,7 @@ public class GenericJdbcConnector extends SqoopConnector {
}
@Override
- public RepositoryUpgrader getRepositoryUpgrader() {
+ public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
return new GenericJdbcConnectorUpgrader();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
index a069b3e..fb92a39 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
@@ -18,64 +18,27 @@
*/
package org.apache.sqoop.connector.jdbc;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
-public class GenericJdbcConnectorUpgrader extends RepositoryUpgrader {
- private static final Logger LOG = Logger.getLogger(GenericJdbcConnectorUpgrader.class);
-
- /*
- * For now, there is no real upgrade. So copy all data over,
- * set the validation messages and error messages to be the same as for the
- * inputs in the original one.
- */
+// NOTE: All config types have the similar upgrade path at this point
+public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
@Override
- public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
- doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
@Override
- public void upgrade(MConfigList original, MConfigList upgradeTarget) {
- doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
- @SuppressWarnings("unchecked")
- private void doUpgrade(List<MConfig> original, List<MConfig> target) {
- // Easier to find the config in the original list if we use a map.
- // Since the constructor takes a list,
- // index is not guaranteed to be the same, so we need to look for
- // equivalence
- Map<String, MConfig> configMap = new HashMap<String, MConfig>();
- for (MConfig config : original) {
- configMap.put(config.getName(), config);
- }
- for (MConfig config : target) {
- List<MInput<?>> inputs = config.getInputs();
- MConfig orginalConfig = configMap.get(config.getName());
- if (orginalConfig == null) {
- LOG.warn("Config: '" + config.getName() + "' not present in old " +
- "generic JDBC connector. So it and its inputs will not be transferred by the upgrader.");
- continue;
- }
- for (MInput input : inputs) {
- try {
- MInput originalInput = orginalConfig.getInput(input.getName());
- input.setValue(originalInput.getValue());
- } catch (SqoopException ex) {
- LOG.warn("Input: '" + input.getName() + "' not present in old " +
- "generic JDBC connector. So it will not be transferred by the upgrader.");
- }
- }
- }
+ @Override
+ public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
deleted file mode 100644
index b17aa21..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MLinkConfig;
-
-public class HdfsConfigUpgrader extends RepositoryUpgrader {
- private static final Logger LOG = Logger.getLogger(HdfsConfigUpgrader.class);
-
- /*
- * For now, there is no real upgrade. So copy all data over,
- * set the validation messages and error messages to be the same as for the
- * inputs in the original one.
- */
-
- @Override
- public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
- doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
- }
-
- @Override
- public void upgrade(MConfigList original, MConfigList upgradeTarget) {
- doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
- }
-
- @SuppressWarnings("unchecked")
- private void doUpgrade(List<MConfig> original, List<MConfig> target) {
- // Easier to find the config in the original list if we use a map.
- // Since the constructor takes a list,
- // index is not guaranteed to be the same, so we need to look for
- // equivalence
- Map<String, MConfig> configMap = new HashMap<String, MConfig>();
- for (MConfig config : original) {
- configMap.put(config.getName(), config);
- }
- for (MConfig config : target) {
- List<MInput<?>> inputs = config.getInputs();
- MConfig originalConfig = configMap.get(config.getName());
- if (originalConfig == null) {
- LOG.warn("Config: '" + config.getName() + "' not present in old " +
- "connector. So it and its inputs will not be transferred by the upgrader.");
- continue;
- }
- for (MInput input : inputs) {
- try {
- MInput originalInput = originalConfig.getInput(input.getName());
- input.setValue(originalInput.getValue());
- } catch (SqoopException ex) {
- LOG.warn("Input: '" + input.getName() + "' not present in old " +
- "connector. So it will not be transferred by the upgrader.");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index 606b9fa..e63e464 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -24,7 +24,7 @@ import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
@@ -123,13 +123,13 @@ public class HdfsConnector extends SqoopConnector {
}
/**
- * Returns an {@linkplain org.apache.sqoop.connector.spi.RepositoryUpgrader} object that can upgrade the
+ * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the
* connection and job metadata.
*
* @return MetadataUpgrader object
*/
@Override
- public RepositoryUpgrader getRepositoryUpgrader() {
- return new HdfsConfigUpgrader();
+ public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
+ return new HdfsConnectorUpgrader();
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
new file mode 100644
index 0000000..14862eb
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+//NOTE: All config types have the similar upgrade path at this point
+public class HdfsConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+ @Override
+ public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+
+ @Override
+ public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+
+ @Override
+ public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index 54bdd13..1919b4b 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -43,15 +43,14 @@ public final class ConnectorHandler {
private final String connectorUniqueName;
private final SqoopConnector connector;
- private MConnector mConnector;
+ private MConnector connectorConfigurable;
public ConnectorHandler(URL configFileUrl) {
connectorUrl = configFileUrl.toString();
try {
properties.load(configFileUrl.openStream());
} catch (IOException ex) {
- throw new SqoopException(ConnectorError.CONN_0003,
- configFileUrl.toString(), ex);
+ throw new SqoopException(ConnectorError.CONN_0003, configFileUrl.toString(), ex);
}
LOG.debug("Connector configuration: " + properties);
@@ -64,12 +63,9 @@ public final class ConnectorHandler {
ConfigurationConstants.CONPROP_PROVIDER_CLASS);
}
+ connectorUniqueName = properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
- connectorUniqueName = properties.getProperty(
- ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
-
- if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0)
- {
+ if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0) {
throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
}
@@ -103,13 +99,11 @@ public final class ConnectorHandler {
connector.getJobConfigurationClass(Direction.TO)));
}
- MLinkConfig connectionForms = new MLinkConfig(
+ MLinkConfig linkConfig = new MLinkConfig(
ConfigUtils.toConfigs(connector.getLinkConfigurationClass()));
- String connectorVersion = connector.getVersion();
-
- mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
- connectionForms, fromConfig, toConfig);
+ connectorConfigurable = new MConnector(connectorUniqueName, connectorClassName, connector.getVersion(),
+ linkConfig, fromConfig, toConfig);
if (LOG.isInfoEnabled()) {
LOG.info("Connector [" + connectorClassName + "] initialized.");
@@ -133,15 +127,15 @@ public final class ConnectorHandler {
return connectorUrl;
}
- public MConnector getMetadata() {
- return mConnector;
+ public MConnector getConnectorConfigurable() {
+ return connectorConfigurable;
}
- public void setMetadata(MConnector connector) {
- this.mConnector = connector;
+ public void setConnectorConfigurable(MConnector mConnector) {
+ this.connectorConfigurable = mConnector;
}
- public SqoopConnector getConnector() {
+ public SqoopConnector getSqoopConnector() {
return connector;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index 5226926..0369b4d 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -92,10 +92,10 @@ public class ConnectorManager implements Reconfigurable {
private Map<String, ConnectorHandler> handlerMap =
new HashMap<String, ConnectorHandler>();
- public List<MConnector> getConnectorsMetadata() {
+ public List<MConnector> getConnectorConfigurables() {
List<MConnector> connectors = new LinkedList<MConnector>();
for(ConnectorHandler handler : handlerMap.values()) {
- connectors.add(handler.getMetadata());
+ connectors.add(handler.getConnectorConfigurable());
}
return connectors;
}
@@ -107,8 +107,8 @@ public class ConnectorManager implements Reconfigurable {
public Map<Long, ResourceBundle> getResourceBundles(Locale locale) {
Map<Long, ResourceBundle> bundles = new HashMap<Long, ResourceBundle>();
for(ConnectorHandler handler : handlerMap.values()) {
- long id = handler.getMetadata().getPersistenceId();
- ResourceBundle bundle = handler.getConnector().getBundle(locale);
+ long id = handler.getConnectorConfigurable().getPersistenceId();
+ ResourceBundle bundle = handler.getSqoopConnector().getBundle(locale);
bundles.put(id, bundle);
}
return bundles;
@@ -116,25 +116,24 @@ public class ConnectorManager implements Reconfigurable {
public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
- return handler.getConnector().getBundle(locale);
+ return handler.getSqoopConnector().getBundle(locale);
}
- public MConnector getConnectorConfig(long connectorId) {
+ public MConnector getConnectorConfigurable(long connectorId) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
if(handler == null) {
return null;
}
-
- return handler.getMetadata();
+ return handler.getConnectorConfigurable();
}
- public SqoopConnector getConnector(long connectorId) {
+ public SqoopConnector getSqoopConnector(long connectorId) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
- return handler.getConnector();
+ return handler.getSqoopConnector();
}
- public SqoopConnector getConnector(String uniqueName) {
- return handlerMap.get(uniqueName).getConnector();
+ public SqoopConnector getSqoopConnector(String uniqueName) {
+ return handlerMap.get(uniqueName).getSqoopConnector();
}
public synchronized void initialize() {
@@ -182,21 +181,21 @@ public class ConnectorManager implements Reconfigurable {
rtx.begin();
for (String name : handlerMap.keySet()) {
ConnectorHandler handler = handlerMap.get(name);
- MConnector connectorMetadata = handler.getMetadata();
+ MConnector connectorMetadata = handler.getConnectorConfigurable();
MConnector registeredMetadata =
repository.registerConnector(connectorMetadata, autoUpgrade);
// Set registered metadata instead of connector metadata as they will
// have filled persistent ids. We should be confident at this point that
// there are no differences between those two structures.
- handler.setMetadata(registeredMetadata);
+ handler.setConnectorConfigurable(registeredMetadata);
String connectorName = handler.getUniqueName();
- if (!handler.getMetadata().hasPersistenceId()) {
+ if (!handler.getConnectorConfigurable().hasPersistenceId()) {
throw new SqoopException(ConnectorError.CONN_0010, connectorName);
}
- nameMap.put(handler.getMetadata().getPersistenceId(), connectorName);
- LOG.debug("Registered connector: " + handler.getMetadata());
+ nameMap.put(handler.getConnectorConfigurable().getPersistenceId(), connectorName);
+ LOG.debug("Registered connector: " + handler.getConnectorConfigurable());
}
rtx.commit();
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/Driver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java
index f1b45bb..46a16ac 100644
--- a/core/src/main/java/org/apache/sqoop/driver/Driver.java
+++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java
@@ -22,12 +22,11 @@ import java.util.Locale;
import java.util.ResourceBundle;
import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig;
@@ -105,25 +104,26 @@ public class Driver implements Reconfigurable {
/**
* Driver config upgrader instance
*/
- private final RepositoryUpgrader driverConfigUpgrader;
+ private final DriverUpgrader driverUpgrader;
/**
* Default driver config auto upgrade option value
*/
private static final boolean DEFAULT_AUTO_UPGRADE = false;
- public Class getDriverConfigurationGroupClass() {
- return DriverConfiguration.class;
+ @SuppressWarnings("rawtypes")
+ public Class getDriverJobConfigurationClass() {
+ return JobConfiguration.class;
}
public Driver() {
- List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverConfigurationGroupClass());
+ List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverJobConfigurationClass());
mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION);
// Build validator
driverValidator = new DriverConfigValidator();
// Build upgrader
- driverConfigUpgrader = new DriverConfigUpgrader();
+ driverUpgrader = new DriverUpgrader();
}
public synchronized void initialize() {
@@ -150,8 +150,8 @@ public class Driver implements Reconfigurable {
return driverValidator;
}
- public RepositoryUpgrader getDriverConfigRepositoryUpgrader() {
- return driverConfigUpgrader;
+ public DriverUpgrader getConfigurableUpgrader() {
+ return driverUpgrader;
}
public MDriver getDriver() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
deleted file mode 100644
index 847b73d..0000000
--- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.driver;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MLinkConfig;
-
-public class DriverConfigUpgrader extends RepositoryUpgrader{
-
- private static final Logger LOG = Logger.getLogger(DriverConfigUpgrader.class);
-
- @Override
- public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
- // NOTE(VB): There are no link configs anymore for driver, this code remains for previous versions
- }
-
- @Override
- public void upgrade(MConfigList original, MConfigList upgradeTarget) {
- doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
- }
-
- @SuppressWarnings("unchecked")
- private void doUpgrade(List<MConfig> original, List<MConfig> target) {
- // Easier to find the config in the original list if we use a map.
- // Since the constructor takes a list,
- // index is not guaranteed to be the same, so we need to look for
- // equivalence
- Map<String, MConfig> configMap = new HashMap<String, MConfig>();
- for (MConfig config : original) {
- configMap.put(config.getName(), config);
- }
- for (MConfig config : target) {
- List<MInput<?>> inputs = config.getInputs();
- MConfig originalConfig = configMap.get(config.getName());
- if(originalConfig == null) {
- LOG.warn("Config: " + config.getName() + " not present in old " +
- "driver config. So it will not be transferred by the upgrader.");
- continue;
- }
-
- for (MInput input : inputs) {
- try {
- MInput originalInput = originalConfig.getInput(input.getName());
- input.setValue(originalInput.getValue());
- } catch (SqoopException ex) {
- LOG.warn("Input: " + input.getName() + " not present in old " +
- "driver config. So it will not be transferred by the upgrader.");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
index 9c3b660..0d9a9b8 100644
--- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
@@ -17,7 +17,7 @@
*/
package org.apache.sqoop.driver;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.driver.configuration.ThrottlingConfig;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.ConfigValidator;
@@ -26,8 +26,8 @@ import org.apache.sqoop.validation.Validator;
public class DriverConfigValidator extends Validator {
@Override
public ConfigValidator validateConfigForJob(Object jobConfiguration) {
- ConfigValidator validation = new ConfigValidator(DriverConfiguration.class);
- DriverConfiguration conf = (DriverConfiguration)jobConfiguration;
+ ConfigValidator validation = new ConfigValidator(JobConfiguration.class);
+ JobConfiguration conf = (JobConfiguration)jobConfiguration;
validateThrottlingConfig(validation,conf.throttlingConfig);
return validation;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
new file mode 100644
index 0000000..b880d3b
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.model.MDriverConfig;
+
+public class DriverUpgrader {
+
+ public void upgradeJobConfig(MDriverConfig original, MDriverConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index df2a5ab..51e562c 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -30,7 +30,7 @@ import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Initializer;
@@ -306,9 +306,9 @@ public class JobManager implements Reconfigurable {
MLink toConnection = getLink(job.getLinkId(Direction.TO));
// get from/to connectors for the connection
- SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+ SqoopConnector fromConnector = getSqoopConnector(fromConnection.getConnectorId());
validateSupportedDirection(fromConnector, Direction.FROM);
- SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+ SqoopConnector toConnector = getSqoopConnector(toConnection.getConnectorId());
validateSupportedDirection(toConnector, Direction.TO);
// link config for the FROM part of the job
@@ -329,7 +329,7 @@ public class JobManager implements Reconfigurable {
// the only driver config for the job
Object driverConfig = ClassUtils
- .instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
+ .instantiate(Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig);
@@ -402,8 +402,8 @@ public class JobManager implements Reconfigurable {
return summary;
}
- SqoopConnector getConnector(long connnectorId) {
- return ConnectorManager.getInstance().getConnector(connnectorId);
+ SqoopConnector getSqoopConnector(long connnectorId) {
+ return ConnectorManager.getInstance().getSqoopConnector(connnectorId);
}
void validateSupportedDirection(SqoopConnector connector, Direction direction) {
@@ -480,7 +480,7 @@ public class JobManager implements Reconfigurable {
}
void prepareJob(JobRequest request) {
- DriverConfiguration jobConfiguration = (DriverConfiguration) request.getDriverConfig();
+ JobConfiguration jobConfiguration = (JobConfiguration) request.getDriverConfig();
// We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this
// count based on other running jobs to meet our SLAs.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
deleted file mode 100644
index d4e2254..0000000
--- a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.driver.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-
-/**
- * Representing the core job configuration
- */
-@ConfigurationClass
-public class DriverConfiguration {
- @Config
- public ThrottlingConfig throttlingConfig;
-
- public DriverConfiguration() {
- throttlingConfig = new ThrottlingConfig();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
new file mode 100644
index 0000000..bf1328a
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ * Representing the driver job configuration
+ */
+@ConfigurationClass
+public class JobConfiguration {
+ @Config
+ public ThrottlingConfig throttlingConfig;
+
+ public JobConfiguration() {
+ throttlingConfig = new ThrottlingConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index f06fd0c..476830d 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -163,10 +163,6 @@ public class JdbcRepository extends Repository {
handler.registerConnector(mConnector, conn);
return mConnector;
} else {
- // Same connector, check if the version is the same.
- // For now, use the "string" versions itself - later we should
- // probably include a build number or something that is
- // monotonically increasing.
if (connectorResult.getUniqueName().equals(mConnector.getUniqueName()) &&
mConnector.getVersion().compareTo(connectorResult.getVersion()) > 0) {
if (autoUpgrade) {
@@ -652,23 +648,23 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- protected void upgradeConnector(final MConnector newConnector,
+ protected void upgradeConnectorConfigs(final MConnector newConnector,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.upgradeConnector(newConnector, conn);
+ handler.upgradeConnectorConfigs(newConnector, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
- protected void upgradeDriver(final MDriver mDriver, RepositoryTransaction tx) {
+ protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.upgradeDriver(mDriver, conn);
+ handler.upgradeDriverConfigs(mDriver, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 5a8e026..4c5229f 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -101,7 +101,7 @@ public abstract class JdbcRepositoryHandler {
* @param conn JDBC link for querying repository
*/
- public abstract void upgradeConnector(MConnector mConnector, Connection conn);
+ public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
/**
@@ -117,7 +117,7 @@ public abstract class JdbcRepositoryHandler {
* the driverConfig.
* @param conn JDBC link for querying repository
*/
- public abstract void upgradeDriver(MDriver mDriver, Connection conn);
+ public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 74a9e12..8f78052 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -22,12 +22,12 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MConfig;
@@ -317,7 +317,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
- protected abstract void upgradeConnector(MConnector newConnector, RepositoryTransaction tx);
+ protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
/**
* Upgrade the driver with the new data supplied in the
@@ -335,7 +335,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
- protected abstract void upgradeDriver(MDriver newDriver, RepositoryTransaction tx);
+ protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
/**
* Delete all inputs for a job
@@ -388,84 +388,88 @@ public abstract class Repository {
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
long connectorID = oldConnector.getPersistenceId();
newConnector.setPersistenceId(connectorID);
- /* Algorithms:
- * 1. Get an upgrader for the connector.
- * 2. Get all links associated with the connector.
- * 3. Get all jobs associated with the connector.
- * 4. Delete the inputs for all of the jobs and links (in that order)
- * 5. Remove all inputs and configs associated with the connector, and
- * register the new configs and inputs.
- * 6. Create new links and jobs with connector part being the ones
- * returned by the upgrader.
- * 7. Validate new links and jobs with connector's validator
- * 8. If any invalid links or jobs detected, throw an exception
- * and stop the bootup of Sqoop server
- * 9. Otherwise, Insert the link inputs followed by job inputs (using
- * updateJob and updatelink)
- */
+
RepositoryTransaction tx = null;
try {
- SqoopConnector connector =
- ConnectorManager.getInstance().getConnector(newConnector
- .getUniqueName());
+ SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
+ newConnector.getUniqueName());
Validator connectorConfigValidator = connector.getConfigValidator();
boolean upgradeSuccessful = true;
- RepositoryUpgrader upgrader = connector.getRepositoryUpgrader();
- List<MLink> linksByConnector = findLinksForConnector(connectorID);
- List<MJob> jobsByConnector = findJobsForConnector(connectorID);
+ // 1. Get an upgrader for the connector
+ ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader();
+ // 2. Get all links associated with the connector.
+ List<MLink> existingLinksByConnector = findLinksForConnector(connectorID);
+ // 3. Get all jobs associated with the connector.
+ List<MJob> existingJobsByConnector = findJobsForConnector(connectorID);
// -- BEGIN TXN --
tx = getTransaction();
tx.begin();
- deletelinksAndJobs(linksByConnector, jobsByConnector, tx);
- upgradeConnector(newConnector, tx);
- for (MLink oldLink : linksByConnector) {
- // Make a new copy of the configs
- List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
- MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
- MLinkConfig oldLinkConfig = oldLink.getConnectorLinkConfig();
- upgrader.upgrade(oldLinkConfig, newLinkConfig);
-
- MLink newlink = new MLink(oldLink, newLinkConfig);
-
- Object newConfigurationObject = ClassUtils.instantiate(connector.getLinkConfigurationClass());
- ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(), newConfigurationObject);
-
- ConfigValidator configValidator = connectorConfigValidator.validateConfigForLink(newConfigurationObject);
- if (configValidator.getStatus().canProceed()) {
- updateLink(newlink, tx);
- } else {
- logInvalidModelObject("link", newlink, configValidator);
- upgradeSuccessful = false;
+ // 4. Delete the inputs for all of the jobs and links (in that order) for
+ // this connector
+ deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
+ // 5. Delete all inputs and configs associated with the connector, and
+ // insert the new configs and inputs for this connector
+ upgradeConnectorConfigs(newConnector, tx);
+ // 6. Run upgrade logic for the configs related to the link objects
+ // dont always rely on the repository implementation to return empty list for links
+ if (existingLinksByConnector != null) {
+ for (MLink link : existingLinksByConnector) {
+ // Make a new copy of the configs
+ List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
+ MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
+ MLinkConfig oldLinkConfig = link.getConnectorLinkConfig();
+ upgrader.upgradeLinkConfig(oldLinkConfig, newLinkConfig);
+ MLink newlink = new MLink(link, newLinkConfig);
+
+ Object newConfigurationObject = ClassUtils.instantiate(connector
+ .getLinkConfigurationClass());
+ ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(),
+ newConfigurationObject);
+ // 7. Run link config validation
+ ConfigValidator configValidator = connectorConfigValidator
+ .validateConfigForLink(newConfigurationObject);
+ if (configValidator.getStatus().canProceed()) {
+ updateLink(newlink, tx);
+ } else {
+ // If any invalid links or jobs detected, throw an exception
+ // and stop the bootup of Sqoop server
+ logInvalidModelObject("link", newlink, configValidator);
+ upgradeSuccessful = false;
+ }
}
}
- for (MJob job : jobsByConnector) {
- // Make a new copy of the configs
- // else the values will get set in the configs in the connector for
- // each job.
- List<MConfig> fromConfig = newConnector.getConfig(Direction.FROM).clone(false).getConfigs();
- List<MConfig> toConfig = newConnector.getConfig(Direction.TO).clone(false).getConfigs();
-
- // New FROM direction configs, old TO direction configs.
- if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) {
- MFromConfig newFromConfig = new MFromConfig(fromConfig);
- MFromConfig oldFromCOnfig = job.getFromJobConfig();
- upgrader.upgrade(oldFromCOnfig, newFromConfig);
-
- MToConfig oldToConfig = job.getToJobConfig();
- MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
- updateJob(newJob, tx);
- }
-
- // Old FROM direction configs, new TO direction configs.
- if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) {
-
- MToConfig oldToConfig = job.getToJobConfig();
- MToConfig newToConfig = new MToConfig(toConfig);
- upgrader.upgrade(oldToConfig, newToConfig);
- MFromConfig oldFromConfig = job.getFromJobConfig();
- MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
- updateJob(newJob, tx);
+ // 8. Run upgrade logic for the configs related to the job objects
+ if (existingJobsByConnector != null) {
+ for (MJob job : existingJobsByConnector) {
+ // every job has 2 parts, the FROM and the TO links and their
+ // corresponding connectors.
+ List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
+ if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
+ MFromConfig newFromConfig = new MFromConfig(fromConfig);
+ MFromConfig oldFromCOnfig = job.getFromJobConfig();
+ upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig);
+ MToConfig oldToConfig = job.getToJobConfig();
+ // create a job with new FROM direction configs but old TO direction
+ // configs
+ MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
+ // TODO( jarcec) : will add the job config validation logic similar
+ // to the link config validation before updating job
+ updateJob(newJob, tx);
+ }
+ List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
+ if (job.getToConnectorId() == newConnector.getPersistenceId()) {
+ MToConfig oldToConfig = job.getToJobConfig();
+ MToConfig newToConfig = new MToConfig(toConfig);
+ upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
+ MFromConfig oldFromConfig = job.getFromJobConfig();
+ // create a job with old FROM direction configs but new TO direction
+ // configs
+ MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
+ // TODO( jarcec) : will add the job config validation logic similar
+ // to the link config validation before updating job
+ updateJob(newJob, tx);
+ }
}
}
@@ -475,20 +479,20 @@ public abstract class Repository {
throw new SqoopException(RepositoryError.JDBCREPO_0027);
}
} catch (SqoopException ex) {
- if(tx != null) {
+ if (tx != null) {
tx.rollback();
}
throw ex;
} catch (Exception ex) {
- if(tx != null) {
+ if (tx != null) {
tx.rollback();
}
throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
} finally {
- if(tx != null) {
+ if (tx != null) {
tx.close();
}
- LOG.info("Metadata upgrade finished for connector: " + oldConnector.getUniqueName());
+ LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName());
}
}
@@ -496,31 +500,38 @@ public abstract class Repository {
LOG.info("Upgrading driver");
RepositoryTransaction tx = null;
try {
- RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
- List<MJob> jobs = findJobs();
-
+ //1. find upgrader
+ DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader();
+ //2. find all jobs in the system
+ List<MJob> existingJobs = findJobs();
Validator validator = Driver.getInstance().getValidator();
boolean upgradeSuccessful = true;
// -- BEGIN TXN --
tx = getTransaction();
tx.begin();
- deleteJobs(jobs, tx);
- upgradeDriver(driver, tx);
+ //3. delete all jobs in the system
+ deleteJobs(existingJobs, tx);
+ // 4. Delete all inputs and configs associated with the driver, and
+ // insert the new configs and inputs for this driver
+ upgradeDriverConfigs(driver, tx);
- for (MJob job : jobs) {
+ for (MJob job : existingJobs) {
// Make a new copy of the configs
MDriverConfig driverConfig = driver.getDriverConfig().clone(false);
MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION);
- upgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig());
+ // At this point, the driver only supports JOB config type
+ upgrader.upgradeJobConfig(job.getDriverConfig(), newDriver.getDriverConfig());
+ // create a new job with old FROM and TO configs but new driver configs
MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig());
- // Transform config structures to objects for validations
- Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
+ Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
ConfigUtils.fromConfigs(newJob.getDriverConfig().getConfigs(), newConfigurationObject);
+ // 5. validate configs
ConfigValidator validation = validator.validateConfigForJob(newConfigurationObject);
if (validation.getStatus().canProceed()) {
+ // 6. update job
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validation);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
index dc4e8c8..e5201fc 100644
--- a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
+++ b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MIntegerInput;
import org.apache.sqoop.model.MStringInput;
@@ -36,30 +37,31 @@ import org.junit.Test;
*/
public class TestDriverConfigUpgrader {
- DriverConfigUpgrader upgrader;
+ DriverUpgrader upgrader;
@Before
public void initializeUpgrader() {
- upgrader = new DriverConfigUpgrader();
+ upgrader = new DriverUpgrader();
}
+
/**
- * We take the same configs on input and output and we
- * expect that all values will be correctly transferred.
+ * We take the same configs on input and output and we expect that all values
+ * will be correctly transferred.
*/
@Test
public void testJobConfigTyeUpgrade() {
- MConfigList original = job();
- MConfigList target = job();
+ MDriverConfig original = job();
+ MDriverConfig target = job();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
- upgrader.upgrade(original, target);
+ upgrader.upgradeJobConfig(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue());
assertEquals("B", target.getStringInput("f1.s2").getValue());
- assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+ assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
}
/**
@@ -67,54 +69,54 @@ public class TestDriverConfigUpgrader {
*/
@Test
public void testNonExistingInput() {
- MConfigList original = job1();
- MConfigList target = job2();
+ MDriverConfig original = job1();
+ MDriverConfig target = job2();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
- upgrader.upgrade(original, target);
+ upgrader.upgradeJobConfig(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue());
assertNull(target.getStringInput("f1.s2_").getValue());
- assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+ assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
}
/**
- * Upgrade scenario when entire has been added in the target and
- * therefore is missing in the original.
+ * Upgrade scenario when entire has been added in the target and therefore is
+ * missing in the original.
*/
@Test
public void testNonExistingConfig() {
- MConfigList original = job1();
- MConfigList target = job3();
+ MDriverConfig original = job1();
+ MDriverConfig target = job3();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
- upgrader.upgrade(original, target);
+ upgrader.upgradeJobConfig(original, target);
assertNull(target.getStringInput("f2.s1").getValue());
assertNull(target.getStringInput("f2.s2").getValue());
assertNull(target.getIntegerInput("f2.i").getValue());
}
- MConfigList job() {
- return new MConfigList(configs1());
+ MDriverConfig job() {
+ return new MDriverConfig(configs1());
}
- MConfigList job1() {
- return new MConfigList(configs1());
+ MDriverConfig job1() {
+ return new MDriverConfig(configs1());
}
- MConfigList job2() {
- return new MConfigList(configs2());
+ MDriverConfig job2() {
+ return new MDriverConfig(configs2());
}
- MConfigList job3() {
- return new MConfigList(configs3());
+ MDriverConfig job3() {
+ return new MDriverConfig(configs3());
}
List<MConfig> configs1() {
@@ -125,8 +127,8 @@ public class TestDriverConfigUpgrader {
List<MInput<?>> inputs1(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>();
- list.add(new MStringInput(formName + ".s1", false, (short)30));
- list.add(new MStringInput(formName + ".s2", false, (short)30));
+ list.add(new MStringInput(formName + ".s1", false, (short) 30));
+ list.add(new MStringInput(formName + ".s2", false, (short) 30));
list.add(new MIntegerInput(formName + ".i", false));
return list;
}
@@ -139,8 +141,8 @@ public class TestDriverConfigUpgrader {
List<MInput<?>> inputs2(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>();
- list.add(new MStringInput(formName + ".s1", false, (short)30));
- list.add(new MStringInput(formName + ".s2_", false, (short)30));
+ list.add(new MStringInput(formName + ".s1", false, (short) 30));
+ list.add(new MStringInput(formName + ".s2_", false, (short) 30));
list.add(new MIntegerInput(formName + ".i", false));
return list;
}
@@ -150,4 +152,4 @@ public class TestDriverConfigUpgrader {
list.add(new MConfig("f2", inputs1("f2")));
return list;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
index 3b475c6..5bc1b03 100644
--- a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
+++ b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
@@ -71,10 +71,10 @@ public class TestJobManager {
@Test
public void testGetConnector() {
- when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock);
+ when(connectorMgrMock.getSqoopConnector(123l)).thenReturn(sqoopConnectorMock);
when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections());
- assertEquals(jobManager.getConnector(123l), sqoopConnectorMock);
- verify(connectorMgrMock, times(1)).getConnector(123l);
+ assertEquals(jobManager.getSqoopConnector(123l), sqoopConnectorMock);
+ verify(connectorMgrMock, times(1)).getSqoopConnector(123l);
}
@Test