You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/08/20 03:41:24 UTC

[2/2] sqoop git commit: SQOOP-2461: Sqoop2: Add MySQL support for the metadata repository

SQOOP-2461: Sqoop2: Add MySQL support for the metadata repository

(Colin Ma via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/668703cf
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/668703cf
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/668703cf

Branch: refs/heads/sqoop2
Commit: 668703cfeb204640fa76aabdb406c46920a9fe27
Parents: 1c24ecb
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Aug 19 18:40:29 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Aug 19 18:40:29 2015 -0700

----------------------------------------------------------------------
 .../sqoop/common/test/db/DatabaseProvider.java  |   9 +
 .../sqoop/common/test/db/MySQLProvider.java     |  29 +-
 .../apache/sqoop/error/code/MySqlRepoError.java |  49 +++
 pom.xml                                         |   5 +
 repository/pom.xml                              |   1 +
 .../repository/common/CommonRepoUtils.java      |   4 +
 repository/repository-mysql/pom.xml             | 124 ++++++
 .../repository/mysql/MySqlRepoConstants.java    |  34 ++
 .../mysql/MySqlRepositoryHandler.java           | 228 +++++++++++
 .../mysql/MySqlSchemaCreateQuery.java           | 297 ++++++++++++++
 .../repository/mysql/MySqlSchemaQuery.java      |  47 +++
 ...RepositoryInsertUpdateDeleteSelectQuery.java |  59 +++
 .../repository/mysql/MySqlTestCase.java         | 186 +++++++++
 .../repository/mysql/MySqlTestUtils.java        | 105 +++++
 .../repository/mysql/TestConnectorHandling.java | 166 ++++++++
 .../repository/mysql/TestDriverHandling.java    |  89 ++++
 .../repository/mysql/TestHandler.java           |  39 ++
 .../repository/mysql/TestJobHandling.java       | 302 ++++++++++++++
 .../repository/mysql/TestLinkHandling.java      | 297 ++++++++++++++
 .../repository/mysql/TestStructure.java         |  81 ++++
 .../mysql/TestSubmissionHandling.java           | 406 +++++++++++++++++++
 server/pom.xml                                  |   5 +
 22 files changed, 2560 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
index a6ae490..4f4d347 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
@@ -467,4 +467,13 @@ abstract public class DatabaseProvider {
 
     return escapeSchemaName(tableName.getSchemaName()) + "." + escapeTableName(tableName.getTableName());
   }
+
+  /**
+   * Drop database, this should be implemented for the DatabaseProvider like:
+   * MySqlProvider.
+   *
+   * @param databaseName
+   */
+  public void dropDatabase(String databaseName) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
index 3083ee6..cef59bb 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.common.test.db;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.test.db.types.DatabaseTypeList;
 import org.apache.sqoop.common.test.db.types.MySQLTypeList;
 
@@ -27,6 +28,7 @@ import org.apache.sqoop.common.test.db.types.MySQLTypeList;
  * on the same box (localhost) that is access via sqoop/sqoop credentials.
  */
 public class MySQLProvider extends DatabaseProvider {
+  private static final Logger LOG = Logger.getLogger(MySQLProvider.class);
 
   public static final String DRIVER = "com.mysql.jdbc.Driver";
 
@@ -70,9 +72,19 @@ public class MySQLProvider extends DatabaseProvider {
     return escape(tableName);
   }
 
+  public String escapeDatabaseName(String databaseName) {
+    return escape(databaseName);
+  }
+
+  // the scheme name is the same as database name.
+  @Override
+  public boolean isSupportingScheme() {
+    return true;
+  }
+
   @Override
   public String escapeValueString(String value) {
-    return "\"" + value + "\"";
+    return escape(value);
   }
 
   @Override
@@ -84,7 +96,20 @@ public class MySQLProvider extends DatabaseProvider {
   public DatabaseTypeList getDatabaseTypes() {
     return new MySQLTypeList();
   }
+
+  @Override
+  public void dropDatabase(String databaseName) {
+    StringBuilder sb = new StringBuilder("DROP DATABASE ");
+    sb.append(escapeDatabaseName(databaseName));
+
+    try {
+      executeUpdate(sb.toString());
+    } catch (RuntimeException e) {
+      LOG.info("Ignoring exception: " + e);
+    }
+  }
+
   public String escape(String entity) {
-    return "`" + entity + "`";
+    return "\"" + entity + "\"";
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java b/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
new file mode 100644
index 0000000..399f741
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.error.code;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum MySqlRepoError implements ErrorCode {
+
+  /** An unknown error has occurred. */
+  MYSQLREPO_0000("An unknown error has occurred"),
+
+  /** The MySQL Repository handler was unable to add directions. */
+  MYSQLREPO_0001("Could not add directions"),
+
+  /** The system was unable to get ID of recently added direction. */
+  MYSQLREPO_0002("Could not get ID of recently added direction"),
+
+  ;
+
+  private final String message;
+
+  private MySqlRepoError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b1dbdc3..25ad00c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,6 +304,11 @@ limitations under the License.
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.sqoop.repository</groupId>
+        <artifactId>sqoop-repository-mysql</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.sqoop</groupId>
         <artifactId>connector-sdk</artifactId>
         <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index c63595c..6a9fbfd 100644
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
     <module>repository-common</module>
     <module>repository-derby</module>
     <module>repository-postgresql</module>
+    <module>repository-mysql</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
index 73293c0..df41fb1 100644
--- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
@@ -39,6 +39,10 @@ public class CommonRepoUtils {
     return QUOTE_CHARACTER + schemaName + QUOTE_CHARACTER;
   }
 
+  public static final String escapeDatabaseName(String databaseName) {
+    return QUOTE_CHARACTER + databaseName + QUOTE_CHARACTER;
+  }
+
   public static final String escapeConstraintName(String constraintName) {
     return QUOTE_CHARACTER + constraintName + QUOTE_CHARACTER;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/pom.xml b/repository/repository-mysql/pom.xml
new file mode 100644
index 0000000..15e909e
--- /dev/null
+++ b/repository/repository-mysql/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>repository</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.repository</groupId>
+  <artifactId>sqoop-repository-mysql</artifactId>
+  <name>Sqoop MySQL Repository</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+    </dependency>
+	<dependency>
+      <groupId>org.apache.sqoop.repository</groupId>
+      <artifactId>sqoop-repository-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludedGroups>mysql</excludedGroups>
+          <excludes>
+            <exclude>**/integration/**</exclude>
+          </excludes>
+          <systemPropertyVariables>
+            <sqoop.integration.tmpdir>${project.build.directory}</sqoop.integration.tmpdir>
+          </systemPropertyVariables>
+        </configuration>
+        <executions>
+          <execution>
+            <id>integration-test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <phase>integration-test</phase>
+            <configuration>
+              <excludes>
+                <exclude>none</exclude>
+              </excludes>
+              <includes>
+                <include>**/integration/**</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>mysql</id>
+      <activation>
+        <property>
+          <name>mysql</name>
+        </property>
+      </activation>
+
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <groups>mysql</groups>
+              <excludedGroups>none</excludedGroups>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
new file mode 100644
index 0000000..c5fd241
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+public class MySqlRepoConstants {
+  public static final String CONF_PREFIX_MYSQL = "mysql.";
+
+  /**
+   * Expected version of the repository structures.
+   *
+   * History:
+   * 1 - Version
+   */
+  public static final int LATEST_MYSQL_REPOSITORY_VERSION = 1;
+
+  private MySqlRepoConstants() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
new file mode 100644
index 0000000..61b058a
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.error.code.MySqlRepoError;
+import org.apache.sqoop.repository.JdbcRepositoryContext;
+import org.apache.sqoop.repository.common.CommonRepoConstants;
+import org.apache.sqoop.repository.common.CommonRepositoryHandler;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+/**
+ * JDBC based repository handler for MySQL database.
+ *
+ * Repository implementation for MySQL database.
+ */
+public class MySqlRepositoryHandler extends CommonRepositoryHandler {
+
+  private static final Logger LOG =
+      Logger.getLogger(MySqlRepositoryHandler.class);
+
+  private JdbcRepositoryContext repoContext;
+
+  public MySqlRepositoryHandler() {
+    crudQueries = new MysqlRepositoryInsertUpdateDeleteSelectQuery();
+  }
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String name() {
+    return "MySql";
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void initialize(JdbcRepositoryContext ctx) {
+    repoContext = ctx;
+    LOG.info("MySqlRepositoryHandler initialized.");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void shutdown() {
+  }
+
+  /**
+   * Detect version of underlying database structures
+   *
+   * @param conn
+   *          JDBC Connection
+   * @return
+   */
+  public int detectRepositoryVersion(Connection conn) {
+    ResultSet rs = null, metadataResultSet = null;
+    PreparedStatement stmt = null;
+
+    // Select and return the version
+    try {
+      DatabaseMetaData md = conn.getMetaData();
+      metadataResultSet = md.getTables(null,
+          CommonRepositorySchemaConstants.SCHEMA_SQOOP,
+          CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME, null);
+
+      if (metadataResultSet.next()) {
+        stmt = conn.prepareStatement(MySqlSchemaQuery.STMT_SELECT_SYSTEM);
+        stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
+        rs = stmt.executeQuery();
+
+        if (!rs.next()) {
+          return 0;
+        }
+
+        return rs.getInt(1);
+      }
+    } catch (SQLException e) {
+      LOG.info("Can't fetch repository structure version.", e);
+      return 0;
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+
+    return 0;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createOrUpgradeRepository(Connection conn) {
+
+    int version = detectRepositoryVersion(conn);
+    LOG.info("Detected repository version: " + version);
+
+    if (version == MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION) {
+      return;
+    }
+
+    if (version < 1) {
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_DATABASE_SQOOP, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_INPUT_RELATION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT, conn);
+
+      // Insert FROM and TO directions.
+      insertDirections(conn);
+    }
+
+    ResultSet rs = null;
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn
+          .prepareStatement(MySqlSchemaQuery.STMT_INSERT_ON_DUPLICATE_KEY_SYSTEM);
+      stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
+      stmt.setString(2,
+          Integer.toString(MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION));
+      stmt.setString(3,
+          Integer.toString(MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION));
+      stmt.executeUpdate();
+    } catch (SQLException e) {
+      LOG.error("Can't persist the repository version", e);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Insert directions: FROM and TO.
+   *
+   * @param conn
+   * @return Map<Direction, Long> direction ID => Direction
+   */
+  protected Map<Direction, Long> insertDirections(Connection conn) {
+    // Add directions
+    Map<Direction, Long> directionMap = new TreeMap<Direction, Long>();
+    PreparedStatement insertDirectionStmt = null;
+    try {
+      // Insert directions and get IDs.
+      for (Direction direction : Direction.values()) {
+        insertDirectionStmt = conn.prepareStatement(
+            MySqlSchemaQuery.STMT_INSERT_DIRECTION,
+            Statement.RETURN_GENERATED_KEYS);
+        insertDirectionStmt.setString(1, direction.toString());
+        if (insertDirectionStmt.executeUpdate() != 1) {
+          throw new SqoopException(MySqlRepoError.MYSQLREPO_0001,
+              "Could not add directions FROM and TO.");
+        }
+
+        ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
+        if (directionId.next()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Loaded direction: " + directionId.getLong(1));
+          }
+
+          directionMap.put(direction, directionId.getLong(1));
+        } else {
+          throw new SqoopException(MySqlRepoError.MYSQLREPO_0002,
+              "Could not get ID of direction " + direction);
+        }
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(MySqlRepoError.MYSQLREPO_0000, e);
+    } finally {
+      closeStatements(insertDirectionStmt);
+    }
+
+    return directionMap;
+  }
+
+  @Override
+  public boolean isRepositorySuitableForUse(Connection conn) {
+    int version = detectRepositoryVersion(conn);
+
+    if (version != MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION) {
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
new file mode 100644
index 0000000..46493a3
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.SCHEMA_SQOOP;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+public class MySqlSchemaCreateQuery {
+
+  public static final String QUERY_CREATE_DATABASE_SQOOP = "CREATE DATABASE " + CommonRepoUtils.escapeDatabaseName(SCHEMA_SQOOP);
+
+  public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_KEY) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_VALUE) + " VARCHAR(64) "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_NAME) + " VARCHAR(64)"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIGURABLE =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_NAME) + " VARCHAR(64),"
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_CLASS) + " VARCHAR(255), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_VERSION) + " VARCHAR(64), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CONFIGURABLE_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_NAME) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONNECTOR_DIRECTIONS_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_CONNECTOR) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_DIRECTION) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCD_SQC_NAME)
+          + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_CONNECTOR) + ") REFERENCES "
+          + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME) + "("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCD_SQD_NAME)
+          + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_DIRECTION) + ") REFERENCES "
+          + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME) + "("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_INDEX) + " SMALLINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_SQC_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CONFIG_UNIQUE_NAME_TYPE_CONFIGURABLE)
+            + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_NAME) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_TYPE) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + ") "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_DIRECTIONS_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_CONFIG) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_DIRECTION) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_CONFIG) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_DIRECTION) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_INDEX) + " SMALLINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_STRMASK) + " BOOLEAN, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_STRLENGTH) + " SMALLINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_EDITABLE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ENUMVALS) + " VARCHAR(100), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQI_SQ_CFG_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_INPUT_UNIQUE_NAME_TYPE_CONFIG)
+            + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_NAME) + ", "
+                          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_TYPE) + ", "
+                          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + ") "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_INPUT_RELATION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_RELATION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_PARENT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_CHILD) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQIR_PARENT_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQIR_PARENT + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQI_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQIR_CHILD_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQIR_CHILD + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQI_ID + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_LINK =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CONFIGURABLE) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_NAME) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ENABLED) + " BOOLEAN DEFAULT TRUE, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_NAME) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNK_SQC_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CONFIGURABLE) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_JOB =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_FROM_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_TO_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ENABLED) + " BOOLEAN DEFAULT TRUE, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_NAME_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_NAME) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_SQ_LNK_FROM_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_FROM_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_SQ_LNK_TO_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_TO_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_VALUE) + " VARCHAR(1000), "
+          + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNKI_SQ_LNK_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNKI_SQI_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_VALUE) + " VARCHAR(1000), "
+          + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQBI_SQB_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQBI_SQI_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_JOB) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_STATUS) + " VARCHAR(20), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_EXTERNAL_ID) + " VARCHAR(50), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_EXTERNAL_LINK) + " VARCHAR(150), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ERROR_SUMMARY) + " VARCHAR(150), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ERROR_DETAILS) + " VARCHAR(750), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQS_SQB_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_JOB) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + ") ON DELETE CASCADE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_GROUP_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQG_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQG_NAME) + " VARCHAR(75) UNIQUE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQR_NAME) + " VARCHAR(75) UNIQUE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_SUBMISSION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_VALUE) + " BIGINT, "
+          + "PRIMARY KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP + ", " + CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER + ", " + CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQG_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_GROUP_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQG_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQR_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQR_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQS_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQS_ID + ") ON DELETE CASCADE"
+          + ")";
+
+   // DDL: Create table SQ_CONTEXT_TYPE
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + " ("
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_NAME) + " VARCHAR(25) UNIQUE"
+           + ")";
+
+   // DDL: Create table SQ_CONTEXT_PROPERTY
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + " ("
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_NAME) + " VARCHAR(500) UNIQUE"
+           + ")";
+
+   // DDL: Create table SQ_CONTEXT
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT) + " ("
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_VALUE) + " VARCHAR(500), "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQS_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE, "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCT_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + "), "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCP_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + ") "
+       + ")";
+
+  private MySqlSchemaCreateQuery() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
new file mode 100644
index 0000000..7394a83
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+/**
+ * DML for MySql repository.
+ */
+public final class MySqlSchemaQuery {
+
+  public static final String STMT_SELECT_SYSTEM =
+      "SELECT "
+          + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE
+          + " FROM " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME)
+          + " WHERE " + CommonRepositorySchemaConstants.COLUMN_SQM_KEY + " = ?";
+
+  public static final String STMT_INSERT_ON_DUPLICATE_KEY_SYSTEM =
+      "INSERT INTO " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME) + "("
+          + CommonRepositorySchemaConstants.COLUMN_SQM_KEY + ", "
+          + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE + ") "
+          + "VALUES(?, ?) ON DUPLICATE KEY UPDATE " + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE + " = ?";
+
+  public static final String STMT_INSERT_DIRECTION =
+      "INSERT INTO " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME)
+          + " (" + CommonRepositorySchemaConstants.COLUMN_SQD_NAME+ ") VALUES (?)";
+
+  private MySqlSchemaQuery() {
+    // disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
new file mode 100644
index 0000000..4c295c0
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_EDITABLE;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_ENUMVALS;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_ID;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_INDEX;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_NAME;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_STRLENGTH;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_STRMASK;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_TYPE;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.SCHEMA_SQOOP;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositoryInsertUpdateDeleteSelectQuery;
+
+public class MysqlRepositoryInsertUpdateDeleteSelectQuery extends
+    CommonRepositoryInsertUpdateDeleteSelectQuery {
+
+  // DML: Get inputs for a given config
+  private static final String STMT_SELECT_INPUT = "SELECT "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ID) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_NAME) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_INDEX) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_TYPE) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_STRMASK) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_STRLENGTH) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_EDITABLE) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ENUMVALS) + ", "
+      + "cast(null as char(100))" + " FROM "
+      + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_INPUT_NAME)
+      + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG)
+      + " = ?" + " ORDER BY "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_INDEX);
+
+  @Override
+  public String getStmtSelectInput() {
+    return STMT_SELECT_INPUT;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
new file mode 100644
index 0000000..0bb3c63
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.test.db.DatabaseProvider;
+import org.apache.sqoop.common.test.db.MySQLProvider;
+import org.apache.sqoop.json.DriverBean;
+import org.apache.sqoop.model.InputEditable;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.MToConfig;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+import org.apache.sqoop.repository.mysql.MySqlRepositoryHandler;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Abstract class with convenience methods for testing mysql repository.
+ */
+abstract public class MySqlTestCase extends TestCase {
+
+  public static DatabaseProvider provider;
+  public static MySqlTestUtils utils;
+  public MySqlRepositoryHandler handler;
+
+  @BeforeClass
+  public void setUpClass() {
+    provider = new MySQLProvider();
+    utils = new MySqlTestUtils(provider);
+  }
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp() throws Exception {
+    provider.start();
+
+    handler = new MySqlRepositoryHandler();
+    handler.createOrUpgradeRepository(provider.getConnection());
+    utils.setDatabase(CommonRepositorySchemaConstants.SCHEMA_SQOOP);
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void tearDown() throws Exception {
+    provider.dropDatabase("SQOOP");
+    provider.stop();
+  }
+
+  protected MConnector getConnector(String name, String className,
+      String version, boolean from, boolean to) {
+    return new MConnector(name, className, version, getLinkConfig(),
+        from ? getFromConfig() : null, to ? getToConfig() : null);
+  }
+
+  protected MDriver getDriver() {
+    return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION);
+  }
+
+  protected MLink getLink(String name, MConnector connector) {
+    MLink link = new MLink(connector.getPersistenceId(),
+        connector.getLinkConfig());
+    link.setName(name);
+    fillLink(link);
+    return link;
+  }
+
+  protected MJob getJob(String name, MConnector connectorA,
+      MConnector connectorB, MLink linkA, MLink linkB) {
+    MDriver driver = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    MJob job = new MJob(connectorA.getPersistenceId(),
+        connectorB.getPersistenceId(), linkA.getPersistenceId(),
+        linkB.getPersistenceId(), connectorA.getFromConfig(),
+        connectorB.getToConfig(), driver.getDriverConfig());
+    job.setName(name);
+    fillJob(job);
+
+    return job;
+  }
+
+  protected MSubmission getSubmission(MJob job,
+      SubmissionStatus submissionStatus) {
+    MSubmission submission = new MSubmission(job.getPersistenceId(),
+        new Date(), submissionStatus);
+    fillSubmission(submission);
+    return submission;
+  }
+
+  protected void fillLink(MLink link) {
+    List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+  }
+
+  protected void fillJob(MJob job) {
+    List<MConfig> configs = job.getFromJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+
+    configs = job.getToJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+
+    configs = job.getDriverConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+  }
+
+  protected void fillSubmission(MSubmission submission) {
+    Counters counters = new Counters();
+    counters.addCounterGroup(new CounterGroup("test-1"));
+    counters.addCounterGroup(new CounterGroup("test-2"));
+    submission.setCounters(counters);
+  }
+
+  protected MLinkConfig getLinkConfig() {
+    return new MLinkConfig(getConfigs("l1", "l2"));
+  }
+
+  protected MFromConfig getFromConfig() {
+    return new MFromConfig(getConfigs("from1", "from2"));
+  }
+
+  protected MToConfig getToConfig() {
+    return new MToConfig(getConfigs("to1", "to2"));
+  }
+
+  protected MDriverConfig getDriverConfig() {
+    return new MDriverConfig(getConfigs("d1", "d2"));
+  }
+
+  protected List<MConfig> getConfigs(String configName1, String configName2) {
+    List<MConfig> configs = new LinkedList<MConfig>();
+
+    List<MInput<?>> inputs = new LinkedList<MInput<?>>();
+    MInput<?> input = new MStringInput("I1", false, InputEditable.ANY,
+        StringUtils.EMPTY, (short) 30);
+    inputs.add(input);
+    input = new MMapInput("I2", false, InputEditable.ANY, "I1");
+    inputs.add(input);
+    configs.add(new MConfig(configName1, inputs));
+
+    inputs = new LinkedList<MInput<?>>();
+    input = new MStringInput("I3", false, InputEditable.ANY, "I4", (short) 30);
+    inputs.add(input);
+    input = new MMapInput("I4", false, InputEditable.ANY, "I3");
+    inputs.add(input);
+    configs.add(new MConfig(configName2, inputs));
+
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
new file mode 100644
index 0000000..3a16135
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.sqoop.common.test.db.DatabaseProvider;
+
+public class MySqlTestUtils {
+
+  private DatabaseProvider provider;
+
+  public MySqlTestUtils(DatabaseProvider provider) {
+    this.provider = provider;
+  }
+
+  public void setDatabase(String database) throws Exception {
+    provider.getConnection().setCatalog(database);
+  }
+
+  public void assertTableExists(String database, String table) throws Exception {
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getTables(null, database, table, null);
+    while (rs.next()) {
+      if (rs.getString(3).equals(table)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find table '" + table
+        + "' part of database '" + database + "'");
+  }
+
+  public void assertForeignKey(String database, String table, String column,
+      String foreignKeyTable, String foreignKeyColumn) throws Exception {
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getCrossReference(null, database, table, null, database,
+        foreignKeyTable);
+    while (rs.next()) {
+      if (rs.getString(4).equals(column)
+          && rs.getString(8).equals(foreignKeyColumn)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find '" + table + "." + column
+        + "' part of database '" + database + "' with reference to '" + table
+        + "." + column + "'");
+  }
+
+  public void assertUniqueConstraints(String database, String table,
+      String... columns) throws Exception {
+    Set<String> columnSet = new TreeSet<String>();
+    Map<String, Set<String>> indexColumnMap = new HashMap<String, Set<String>>();
+
+    for (String column : columns) {
+      columnSet.add(column);
+    }
+
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getIndexInfo(null, database, table, true, false);
+
+    // Get map of index => columns
+    while (rs.next()) {
+      String indexName = rs.getString(6);
+      String columnName = rs.getString(9);
+      if (!indexColumnMap.containsKey(indexName)) {
+        indexColumnMap.put(indexName, new TreeSet<String>());
+      }
+      indexColumnMap.get(indexName).add(columnName);
+    }
+
+    // Validate unique constraints
+    for (String index : indexColumnMap.keySet()) {
+      if (indexColumnMap.get(index).equals(columnSet)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find unique constraint on table '"
+        + table + "' part of database '" + database
+        + "' with reference to columns '" + columnSet + "'");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
new file mode 100644
index 0000000..8e1b3d1
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.List;
+
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MConnector;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestConnectorHandling extends MySqlTestCase {
+
+  @Test
+  public void testFindConnector() throws Exception {
+    // On empty repository, no connectors should be there
+    assertNull(handler.findConnector("A", provider.getConnection()));
+
+    // Register a single connector
+    handler.registerConnector(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        provider.getConnection());
+
+    // Retrieve it and compare with original
+    MConnector connector = handler.findConnector("A", provider.getConnection());
+    assertNotNull(connector);
+    assertEquals(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        connector);
+  }
+
+  @Test
+  public void testFindAllConnectors() throws Exception {
+    // No connectors in an empty repository, we expect an empty list
+    assertEquals(handler.findConnectors(provider.getConnection()).size(), 0);
+
+    // Register connectors
+    handler.registerConnector(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        provider.getConnection());
+    handler.registerConnector(
+        getConnector("B", "org.apache.sqoop.test.B", "1.0-test", true, true),
+        provider.getConnection());
+
+    // loadConfigurables();
+    // Retrieve connectors
+    List<MConnector> connectors = handler.findConnectors(provider
+        .getConnection());
+    assertNotNull(connectors);
+    assertEquals(connectors.size(), 2);
+    assertEquals(connectors.get(0).getUniqueName(), "A");
+    assertEquals(connectors.get(1).getUniqueName(), "B");
+  }
+
+  @Test
+  public void testRegisterConnector() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", true, true);
+    handler.registerConnector(connector, provider.getConnection());
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 6);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 12);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 9);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testFromDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", true, false);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 4);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 8);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 6);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testToDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", false, true);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 4);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 8);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 6);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testNeitherDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", false, false);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 2);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 4);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 3);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
new file mode 100644
index 0000000..c3144ef
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MDriver;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestDriverHandling extends MySqlTestCase {
+
+  private static final Object CURRENT_DRIVER_VERSION = "1";
+
+  @Test
+  public void testFindDriver() throws Exception {
+    // On empty repository, no driverConfig should be there
+    assertNull(handler
+        .findDriver(MDriver.DRIVER_NAME, provider.getConnection()));
+
+    // Register driver
+    handler.registerDriver(getDriver(), provider.getConnection());
+
+    // Retrieve it
+    MDriver driver = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    assertNotNull(driver);
+    assertNotNull(driver.getDriverConfig());
+    assertEquals("1", driver.getVersion());
+    assertEquals("1", driver.getVersion());
+
+    // Compare with original
+    assertEquals(getDriver().getDriverConfig(), driver.getDriverConfig());
+  }
+
+  @Test
+  public void testRegisterDriver() throws Exception {
+    MDriver driver = getDriver();
+    handler.registerDriver(driver, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, driver.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 2);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 4);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 3);
+
+    // Registered driver and config should be easily recovered back
+    MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(driver, retrieved);
+    assertEquals(driver.getVersion(), retrieved.getVersion());
+  }
+
+  @Test
+  public void testDriverVersionUpgrade() throws Exception {
+    MDriver driver = getDriver();
+    handler.registerDriver(driver, provider.getConnection());
+    String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection()).getVersion();
+    assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion);
+    driver.setVersion("2");
+    handler.upgradeDriverAndConfigs(driver, provider.getConnection());
+    assertEquals("2", driver.getVersion());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
new file mode 100644
index 0000000..bfa53bd
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestHandler extends MySqlTestCase {
+
+  @Test
+  public void testHasLatestRepositoryVersion() throws Exception {
+    assertTrue(handler.isRepositorySuitableForUse(provider.getConnection()));
+  }
+
+  @Test
+  public void testDoubleUpdate() throws Exception {
+    handler.createOrUpgradeRepository(provider.getConnection());
+    assertTrue(handler.isRepositorySuitableForUse(provider.getConnection()));
+  }
+}