You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/08/20 03:41:23 UTC

[1/2] sqoop git commit: SQOOP-2461: Sqoop2: Add MySQL support for the metadata repository

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 1c24ecbde -> 668703cfe


http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestJobHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestJobHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestJobHandling.java
new file mode 100644
index 0000000..0614d7a
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestJobHandling.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestJobHandling extends MySqlTestCase {
+
+  public static final String CONNECTOR_A_NAME = "A";
+  public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
+  public static final String CONNECTOR_A_VERSION = "1.0-test";
+  public static final String CONNECTOR_B_NAME = "B";
+  public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
+  public static final String CONNECTOR_B_VERSION = "1.0-test";
+  public static final String LINK_A_NAME = "Link-A";
+  public static final String LINK_B_NAME = "Link-B";
+  public static final String JOB_A_NAME = "Job-A";
+  public static final String JOB_B_NAME = "Job-B";
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp() throws Exception {
+    super.setUp();
+
+    handler.registerDriver(getDriver(), provider.getConnection());
+    MConnector connectorA = getConnector(CONNECTOR_A_NAME,
+        CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
+    MConnector connectorB = getConnector(CONNECTOR_B_NAME,
+        CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
+    handler.registerConnector(connectorA, provider.getConnection());
+    handler.registerConnector(connectorB, provider.getConnection());
+    MLink linkA = getLink(LINK_A_NAME, connectorA);
+    MLink linkB = getLink(LINK_B_NAME, connectorB);
+    handler.createLink(linkA, provider.getConnection());
+    handler.createLink(linkB, provider.getConnection());
+    handler.createJob(getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB),
+        provider.getConnection());
+    handler.createJob(getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA),
+        provider.getConnection());
+  }
+
+  @Test
+  public void testFindJobFail() throws Exception {
+    for (MJob job : handler.findJobs(provider.getConnection())) {
+      handler.deleteJob(job.getPersistenceId(), provider.getConnection());
+    }
+
+    // Let's try to find non existing job
+    assertNull(handler.findJob(1, provider.getConnection()));
+  }
+
+  @Test
+  public void testFindJobSuccess() throws Exception {
+    MJob firstJob = handler.findJob(1, provider.getConnection());
+    assertNotNull(firstJob);
+    assertEquals(1, firstJob.getPersistenceId());
+    assertEquals(JOB_A_NAME, firstJob.getName());
+
+    List<MConfig> configs;
+
+    configs = firstJob.getFromJobConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+
+    configs = firstJob.getToJobConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+
+    configs = firstJob.getDriverConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+  }
+
+  @Test
+  public void testFindJobs() throws Exception {
+    List<MJob> list;
+
+    list = handler.findJobs(provider.getConnection());
+    assertEquals(2, list.size());
+    assertEquals(JOB_A_NAME, list.get(0).getName());
+    assertEquals(JOB_B_NAME, list.get(1).getName());
+
+    // Delete jobs
+    for (MJob job : handler.findJobs(provider.getConnection())) {
+      handler.deleteJob(job.getPersistenceId(), provider.getConnection());
+    }
+
+    // Load all two links on loaded repository
+    list = handler.findJobs(provider.getConnection());
+    assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testFindJobsByConnector() throws Exception {
+    List<MJob> list = handler
+        .findJobsForConnector(
+            handler.findConnector("A", provider.getConnection())
+                .getPersistenceId(), provider.getConnection());
+    assertEquals(2, list.size());
+    assertEquals(JOB_A_NAME, list.get(0).getName());
+    assertEquals(JOB_B_NAME, list.get(1).getName());
+  }
+
+  @Test
+  public void testFindJobsForNonExistingConnector() throws Exception {
+    List<MJob> list = handler
+        .findJobsForConnector(11, provider.getConnection());
+    assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testExistsJob() throws Exception {
+    assertTrue(handler.existsJob(1, provider.getConnection()));
+    assertTrue(handler.existsJob(2, provider.getConnection()));
+    assertFalse(handler.existsJob(3, provider.getConnection()));
+
+    // Delete jobs
+    for (MJob job : handler.findJobs(provider.getConnection())) {
+      handler.deleteJob(job.getPersistenceId(), provider.getConnection());
+    }
+
+    // There shouldn't be anything on empty repository
+    assertFalse(handler.existsJob(1, provider.getConnection()));
+    assertFalse(handler.existsJob(2, provider.getConnection()));
+    assertFalse(handler.existsJob(3, provider.getConnection()));
+  }
+
+  @Test
+  public void testInUseJob() throws Exception {
+    MSubmission submission = getSubmission(
+        handler.findJob(1, provider.getConnection()), SubmissionStatus.RUNNING);
+    handler.createSubmission(submission, provider.getConnection());
+
+    assertTrue(handler.inUseJob(1, provider.getConnection()));
+    assertFalse(handler.inUseJob(2, provider.getConnection()));
+    assertFalse(handler.inUseJob(3, provider.getConnection()));
+  }
+
+  @Test
+  public void testCreateJob() throws Exception {
+    Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
+
+    MJob retrieved = handler.findJob(1, provider.getConnection());
+    assertEquals(1, retrieved.getPersistenceId());
+
+    List<MConfig> configs;
+    configs = retrieved.getFromJobConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    configs = retrieved.getToJobConfig().getConfigs();
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+
+    configs = retrieved.getDriverConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+  }
+
+  @Test
+  public void testCreateDuplicateJob() throws Exception {
+    // Duplicate jobs
+    MJob job = handler.findJob(JOB_A_NAME, provider.getConnection());
+    job.setPersistenceId(MJob.PERSISTANCE_ID_DEFAULT);
+    try {
+      handler.createJob(job, provider.getConnection());
+      Assert.fail("SqoopException should be thrown.");
+    } catch (SqoopException se) {
+      // ignore the excepted exception.
+    }
+  }
+
+  @Test
+  public void testUpdateJob() throws Exception {
+    Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
+
+    MJob job = handler.findJob(1, provider.getConnection());
+
+    List<MConfig> configs;
+
+    configs = job.getFromJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
+
+    configs = job.getToJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
+
+    configs = job.getDriverConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(0).getInputs().get(1))
+        .setValue(new HashMap<String, String>()); // inject new map value
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(1).getInputs().get(1))
+        .setValue(new HashMap<String, String>()); // inject new map value
+
+    job.setName("name");
+
+    handler.updateJob(job, provider.getConnection());
+
+    assertEquals(1, job.getPersistenceId());
+    Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 14);
+
+    MJob retrieved = handler.findJob(1, provider.getConnection());
+    assertEquals("name", retrieved.getName());
+
+    configs = job.getFromJobConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    configs = job.getToJobConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+
+    configs = retrieved.getDriverConfig().getConfigs();
+    assertEquals(2, configs.size());
+    assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
+    assertNotNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals(((Map) configs.get(0).getInputs().get(1).getValue()).size(), 0);
+  }
+
+  @Test
+  public void testEnableAndDisableJob() throws Exception {
+    // disable job 1
+    handler.enableJob(1, false, provider.getConnection());
+
+    MJob retrieved = handler.findJob(1, provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(false, retrieved.getEnabled());
+
+    // enable job 1
+    handler.enableJob(1, true, provider.getConnection());
+
+    retrieved = handler.findJob(1, provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(true, retrieved.getEnabled());
+  }
+
+  @Test
+  public void testDeleteJob() throws Exception {
+    handler.deleteJob(1, provider.getConnection());
+    Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 1);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 6);
+
+    handler.deleteJob(2, provider.getConnection());
+    Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 0);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestLinkHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestLinkHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestLinkHandling.java
new file mode 100644
index 0000000..cceef09
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestLinkHandling.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.List;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestLinkHandling extends MySqlTestCase {
+
+  public static final String CONNECTOR_A_NAME = "A";
+  public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
+  public static final String CONNECTOR_A_VERSION = "1.0-test";
+  public static final String CONNECTOR_B_NAME = "B";
+  public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
+  public static final String CONNECTOR_B_VERSION = "1.0-test";
+  public static final String LINK_A_NAME = "Link-A";
+  public static final String LINK_B_NAME = "Link-B";
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp() throws Exception {
+    super.setUp();
+
+    handler.registerDriver(getDriver(), provider.getConnection());
+    MConnector connectorA = getConnector(CONNECTOR_A_NAME,
+        CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
+    MConnector connectorB = getConnector(CONNECTOR_B_NAME,
+        CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
+    handler.registerConnector(connectorA, provider.getConnection());
+    handler.registerConnector(connectorB, provider.getConnection());
+    MLink linkA = getLink(LINK_A_NAME, connectorA);
+    MLink linkB = getLink(LINK_B_NAME, connectorB);
+    handler.createLink(linkA, provider.getConnection());
+    handler.createLink(linkB, provider.getConnection());
+  }
+
+  @Test
+  public void testFindLinkFail() {
+    // Delete links
+    for (MLink link : handler.findLinks(provider.getConnection())) {
+      handler.deleteLink(link.getPersistenceId(), provider.getConnection());
+    }
+
+    assertNull(handler.findLink(1, provider.getConnection()));
+  }
+
+  @Test
+  public void testFindLinkSuccess() throws Exception {
+    MLink linkA = handler.findLink(1, provider.getConnection());
+    assertNotNull(linkA);
+    assertEquals(1, linkA.getPersistenceId());
+    assertEquals(LINK_A_NAME, linkA.getName());
+
+    // Check connector link config
+    List<MConfig> configs = linkA.getConnectorLinkConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+  }
+
+  @Test
+  public void testFindLinkByName() throws Exception {
+    // Load non-existing
+    assertNull(handler.findLink("non-existing", provider.getConnection()));
+
+    MLink linkA = handler.findLink(LINK_A_NAME, provider.getConnection());
+    assertNotNull(linkA);
+    assertEquals(1, linkA.getPersistenceId());
+    assertEquals(LINK_A_NAME, linkA.getName());
+
+    // Check connector link config
+    List<MConfig> configs = linkA.getConnectorLinkConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+  }
+
+  @Test
+  public void testFindLinks() throws Exception {
+    List<MLink> list;
+
+    // Load all two links on loaded repository
+    list = handler.findLinks(provider.getConnection());
+    assertEquals(2, list.size());
+    assertEquals(LINK_A_NAME, list.get(0).getName());
+    assertEquals(LINK_B_NAME, list.get(1).getName());
+
+    // Delete links
+    for (MLink link : handler.findLinks(provider.getConnection())) {
+      handler.deleteLink(link.getPersistenceId(), provider.getConnection());
+    }
+
+    // Load empty list on empty repository
+    list = handler.findLinks(provider.getConnection());
+    assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testFindLinksByConnector() throws Exception {
+    List<MLink> list;
+    Long connectorId = handler.findConnector("A", provider.getConnection())
+        .getPersistenceId();
+
+    // Load all two links on loaded repository
+    list = handler.findLinksForConnector(connectorId, provider.getConnection());
+    assertEquals(1, list.size());
+    assertEquals(LINK_A_NAME, list.get(0).getName());
+
+    // Delete links
+    for (MLink link : handler.findLinks(provider.getConnection())) {
+      handler.deleteLink(link.getPersistenceId(), provider.getConnection());
+    }
+
+    // Load empty list on empty repository
+    list = handler.findLinksForConnector(connectorId, provider.getConnection());
+    assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testFindLinksByNonExistingConnector() throws Exception {
+    List<MLink> list = handler.findLinksForConnector(11,
+        provider.getConnection());
+    assertEquals(0, list.size());
+  }
+
+  @Test
+  public void testExistsLink() throws Exception {
+    assertTrue(handler.existsLink(1, provider.getConnection()));
+    assertTrue(handler.existsLink(2, provider.getConnection()));
+    assertFalse(handler.existsLink(3, provider.getConnection()));
+
+    // Delete links
+    for (MLink link : handler.findLinks(provider.getConnection())) {
+      handler.deleteLink(link.getPersistenceId(), provider.getConnection());
+    }
+
+    assertFalse(handler.existsLink(1, provider.getConnection()));
+    assertFalse(handler.existsLink(2, provider.getConnection()));
+    assertFalse(handler.existsLink(3, provider.getConnection()));
+  }
+
+  @Test
+  public void testCreateLink() throws Exception {
+    List<MConfig> configs;
+
+    MLink retrieved = handler.findLink(1, provider.getConnection());
+    assertEquals(1, retrieved.getPersistenceId());
+
+    configs = retrieved.getConnectorLinkConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+
+    retrieved = handler.findLink(2, provider.getConnection());
+    assertEquals(2, retrieved.getPersistenceId());
+
+    configs = retrieved.getConnectorLinkConfig().getConfigs();
+    assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+
+    Assert
+        .assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_LINK")), 2);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_LINK_INPUT")), 4);
+  }
+
+  @Test
+  public void testCreateDuplicateLink() throws SqoopException {
+    MLink link = handler.findLink(LINK_A_NAME, provider.getConnection());
+    link.setPersistenceId(MLink.PERSISTANCE_ID_DEFAULT);
+    try {
+      handler.createLink(link, provider.getConnection());
+      Assert.fail("SqoopException should be thrown.");
+    } catch (SqoopException se) {
+      // ignore the excepted exception.
+    }
+  }
+
+  @Test
+  public void testInUseLink() throws Exception {
+    assertFalse(handler.inUseLink(1, provider.getConnection()));
+
+    // Create job and submission and make that job in use to make sure link is
+    // in use.
+    MLink linkA = handler.findLink(LINK_A_NAME, provider.getConnection());
+    MJob job = getJob("Job-A",
+        handler.findConnector("A", provider.getConnection()),
+        handler.findConnector("B", provider.getConnection()), linkA,
+        handler.findLink(LINK_B_NAME, provider.getConnection()));
+    handler.createJob(job, provider.getConnection());
+    MSubmission submission = getSubmission(job, SubmissionStatus.RUNNING);
+    handler.createSubmission(submission, provider.getConnection());
+
+    assertTrue(handler.inUseLink(linkA.getPersistenceId(),
+        provider.getConnection()));
+  }
+
+  @Test
+  public void testUpdateLink() throws Exception {
+    MLink link = handler.findLink(1, provider.getConnection());
+
+    List<MConfig> configs;
+
+    configs = link.getConnectorLinkConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Updated");
+    ((MMapInput) configs.get(1).getInputs().get(1)).setValue(null);
+
+    link.setName("name");
+
+    handler.updateLink(link, provider.getConnection());
+
+    assertEquals(1, link.getPersistenceId());
+    Assert
+        .assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_LINK")), 2);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_LINK_INPUT")), 4);
+
+    MLink retrieved = handler.findLink(1, provider.getConnection());
+    assertEquals("name", link.getName());
+
+    configs = retrieved.getConnectorLinkConfig().getConfigs();
+    assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
+    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals("Updated", configs.get(1).getInputs().get(0).getValue());
+    assertNull(configs.get(1).getInputs().get(1).getValue());
+  }
+
+  @Test
+  public void testEnableAndDisableLink() throws Exception {
+    // disable link 1
+    handler.enableLink(1, false, provider.getConnection());
+
+    MLink retrieved = handler.findLink(1, provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(false, retrieved.getEnabled());
+
+    // enable link 1
+    handler.enableLink(1, true, provider.getConnection());
+
+    retrieved = handler.findLink(1, provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(true, retrieved.getEnabled());
+  }
+
+  @Test
+  public void testDeleteLink() throws Exception {
+    handler.deleteLink(1, provider.getConnection());
+    Assert
+        .assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_LINK")), 1);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_LINK_INPUT")), 2);
+
+    handler.deleteLink(2, provider.getConnection());
+    Assert
+        .assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_LINK")), 0);
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_LINK_INPUT")), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestStructure.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestStructure.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestStructure.java
new file mode 100644
index 0000000..3dec58c
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestStructure.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestStructure extends MySqlTestCase {
+
+  @Test
+  public void testTables() throws Exception {
+    utils.assertTableExists("SQOOP", "SQ_SYSTEM");
+    utils.assertTableExists("SQOOP", "SQ_DIRECTION");
+    utils.assertTableExists("SQOOP", "SQ_CONFIGURABLE");
+    utils.assertTableExists("SQOOP", "SQ_CONNECTOR_DIRECTIONS");
+    utils.assertTableExists("SQOOP", "SQ_CONFIG");
+    utils.assertTableExists("SQOOP", "SQ_CONNECTOR_DIRECTIONS");
+    utils.assertTableExists("SQOOP", "SQ_INPUT");
+    utils.assertTableExists("SQOOP", "SQ_INPUT_RELATION");
+    utils.assertTableExists("SQOOP", "SQ_LINK");
+    utils.assertTableExists("SQOOP", "SQ_JOB");
+    utils.assertTableExists("SQOOP", "SQ_LINK_INPUT");
+    utils.assertTableExists("SQOOP", "SQ_JOB_INPUT");
+    utils.assertTableExists("SQOOP", "SQ_SUBMISSION");
+    utils.assertTableExists("SQOOP", "SQ_COUNTER_GROUP");
+    utils.assertTableExists("SQOOP", "SQ_COUNTER");
+    utils.assertTableExists("SQOOP", "SQ_COUNTER_SUBMISSION");
+  }
+
+  @Test
+  public void testForeignKeys() throws Exception {
+    utils.assertForeignKey("SQOOP", "SQ_CONFIGURABLE", "SQC_ID", "SQ_CONNECTOR_DIRECTIONS", "SQCD_CONNECTOR");
+    utils.assertForeignKey("SQOOP", "SQ_DIRECTION", "SQD_ID", "SQ_CONNECTOR_DIRECTIONS", "SQCD_DIRECTION");
+    utils.assertForeignKey("SQOOP", "SQ_CONFIGURABLE", "SQC_ID", "SQ_CONFIG", "SQ_CFG_CONFIGURABLE");
+    utils.assertForeignKey("SQOOP", "SQ_CONFIG", "SQ_CFG_ID", "SQ_CONFIG_DIRECTIONS", "SQ_CFG_DIR_CONFIG");
+    utils.assertForeignKey("SQOOP", "SQ_DIRECTION", "SQD_ID", "SQ_CONFIG_DIRECTIONS", "SQ_CFG_DIR_DIRECTION");
+    utils.assertForeignKey("SQOOP", "SQ_CONFIG", "SQ_CFG_ID", "SQ_INPUT", "SQI_CONFIG");
+    utils.assertForeignKey("SQOOP", "SQ_CONFIGURABLE", "SQC_ID", "SQ_LINK", "SQ_LNK_CONFIGURABLE");
+    utils.assertForeignKey("SQOOP", "SQ_LINK", "SQ_LNK_ID", "SQ_JOB", "SQB_FROM_LINK");
+    utils.assertForeignKey("SQOOP", "SQ_LINK", "SQ_LNK_ID", "SQ_JOB", "SQB_TO_LINK");
+    utils.assertForeignKey("SQOOP", "SQ_LINK", "SQ_LNK_ID", "SQ_LINK_INPUT", "SQ_LNKI_LINK");
+    utils.assertForeignKey("SQOOP", "SQ_INPUT", "SQI_ID", "SQ_LINK_INPUT", "SQ_LNKI_INPUT");
+    utils.assertForeignKey("SQOOP", "SQ_INPUT", "SQI_ID", "SQ_INPUT_RELATION", "SQIR_PARENT_ID");
+    utils.assertForeignKey("SQOOP", "SQ_INPUT", "SQI_ID", "SQ_INPUT_RELATION", "SQIR_CHILD_ID");
+    utils.assertForeignKey("SQOOP", "SQ_JOB", "SQB_ID", "SQ_JOB_INPUT", "SQBI_JOB");
+    utils.assertForeignKey("SQOOP", "SQ_INPUT", "SQI_ID", "SQ_JOB_INPUT", "SQBI_INPUT");
+    utils.assertForeignKey("SQOOP", "SQ_JOB", "SQB_ID", "SQ_SUBMISSION", "SQS_JOB");
+    utils.assertForeignKey("SQOOP", "SQ_COUNTER", "SQR_ID", "SQ_COUNTER_SUBMISSION", "SQRS_COUNTER");
+    utils.assertForeignKey("SQOOP", "SQ_COUNTER_GROUP", "SQG_ID", "SQ_COUNTER_SUBMISSION", "SQRS_GROUP");
+    utils.assertForeignKey("SQOOP", "SQ_SUBMISSION", "SQS_ID", "SQ_COUNTER_SUBMISSION", "SQRS_SUBMISSION");
+  }
+
+  @Test
+  public void testUniqueConstraints() throws Exception {
+    utils.assertUniqueConstraints("SQOOP", "SQ_CONFIGURABLE", "SQC_NAME");
+    utils.assertUniqueConstraints("SQOOP", "SQ_LINK", "SQ_LNK_NAME");
+    utils.assertUniqueConstraints("SQOOP", "SQ_JOB", "SQB_NAME");
+    utils.assertUniqueConstraints("SQOOP", "SQ_CONFIG", "SQ_CFG_NAME", "SQ_CFG_CONFIGURABLE", "SQ_CFG_TYPE");
+    utils.assertUniqueConstraints("SQOOP", "SQ_INPUT", "SQI_NAME", "SQI_TYPE", "SQI_CONFIG");
+    utils.assertUniqueConstraints("SQOOP", "SQ_COUNTER", "SQR_NAME");
+    utils.assertUniqueConstraints("SQOOP", "SQ_COUNTER_GROUP", "SQG_NAME");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestSubmissionHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestSubmissionHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestSubmissionHandling.java
new file mode 100644
index 0000000..e2a3011
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestSubmissionHandling.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestSubmissionHandling extends MySqlTestCase {
+  public static final String CONNECTOR_A_NAME = "A";
+  public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
+  public static final String CONNECTOR_A_VERSION = "1.0-test";
+  public static final String CONNECTOR_B_NAME = "B";
+  public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
+  public static final String CONNECTOR_B_VERSION = "1.0-test";
+  public static final String LINK_A_NAME = "Link-A";
+  public static final String LINK_B_NAME = "Link-B";
+  public static final String JOB_A_NAME = "Job-A";
+  public static final String JOB_B_NAME = "Job-B";
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp() throws Exception {
+    super.setUp();
+
+    handler.registerDriver(getDriver(), provider.getConnection());
+    MConnector connectorA = getConnector(CONNECTOR_A_NAME,
+        CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
+    MConnector connectorB = getConnector(CONNECTOR_B_NAME,
+        CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
+    handler.registerConnector(connectorA, provider.getConnection());
+    handler.registerConnector(connectorB, provider.getConnection());
+    MLink linkA = getLink(LINK_A_NAME, connectorA);
+    MLink linkB = getLink(LINK_B_NAME, connectorB);
+    handler.createLink(linkA, provider.getConnection());
+    handler.createLink(linkB, provider.getConnection());
+    MJob jobA = getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB);
+    MJob jobB = getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA);
+    handler.createJob(jobA, provider.getConnection());
+    handler.createJob(jobB, provider.getConnection());
+  }
+
+  private void loadSubmissions() throws Exception {
+    MJob jobA = handler.findJob(JOB_A_NAME, provider.getConnection());
+    MJob jobB = handler.findJob(JOB_B_NAME, provider.getConnection());
+
+    MSubmission submissionA = getSubmission(jobA, SubmissionStatus.RUNNING);
+    submissionA.getCounters().getCounterGroup("test-1")
+        .addCounter(new Counter("counter-1"));
+    submissionA.getCounters().getCounterGroup("test-1")
+        .addCounter(new Counter("counter-2"));
+    submissionA.getCounters().getCounterGroup("test-1").getCounter("counter-1")
+        .setValue(300);
+    MSubmission submissionB = getSubmission(jobA, SubmissionStatus.SUCCEEDED);
+    MSubmission submissionC = getSubmission(jobB, SubmissionStatus.FAILED);
+    MSubmission submissionD = getSubmission(jobB, SubmissionStatus.UNKNOWN);
+    handler.createSubmission(submissionA, provider.getConnection());
+    handler.createSubmission(submissionB, provider.getConnection());
+    handler.createSubmission(submissionC, provider.getConnection());
+    handler.createSubmission(submissionD, provider.getConnection());
+  }
+
+  @Test
+  public void testFindSubmissionsUnfinished() throws Exception {
+    List<MSubmission> submissions;
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+
+    loadSubmissions();
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+  }
+
+  @Test
+  public void testExistsSubmission() throws Exception {
+    // There shouldn't be anything on empty repository
+    assertFalse(handler.existsSubmission(1, provider.getConnection()));
+    assertFalse(handler.existsSubmission(2, provider.getConnection()));
+    assertFalse(handler.existsSubmission(3, provider.getConnection()));
+    assertFalse(handler.existsSubmission(4, provider.getConnection()));
+    assertFalse(handler.existsSubmission(5, provider.getConnection()));
+
+    loadSubmissions();
+
+    assertTrue(handler.existsSubmission(1, provider.getConnection()));
+    assertTrue(handler.existsSubmission(2, provider.getConnection()));
+    assertTrue(handler.existsSubmission(3, provider.getConnection()));
+    assertTrue(handler.existsSubmission(4, provider.getConnection()));
+    assertFalse(handler.existsSubmission(5, provider.getConnection()));
+  }
+
+  @Test
+  public void testCreateSubmission() throws Exception {
+    Date creationDate = new Date();
+    Date updateDate = new Date();
+
+    CounterGroup firstGroup = new CounterGroup("ga");
+    CounterGroup secondGroup = new CounterGroup("gb");
+    firstGroup.addCounter(new Counter("ca", 100));
+    firstGroup.addCounter(new Counter("cb", 200));
+    secondGroup.addCounter(new Counter("ca", 300));
+    secondGroup.addCounter(new Counter("cd", 400));
+    Counters counters = new Counters();
+    counters.addCounterGroup(firstGroup);
+    counters.addCounterGroup(secondGroup);
+
+    MutableContext fromContext = new MutableMapContext();
+    MutableContext toContext = new MutableMapContext();
+    MutableContext driverContext = new MutableMapContext();
+    fromContext.setString("from1", "value1");
+    fromContext.setString("from2", "value2");
+    toContext.setString("to1", "value1");
+    toContext.setString("to2", "value2");
+    driverContext.setString("driver1", "value1");
+    driverContext.setString("driver2", "value2");
+
+    MSubmission submission = new MSubmission();
+    submission.setJobId(1);
+    submission.setStatus(SubmissionStatus.RUNNING);
+    submission.setCreationDate(creationDate);
+    submission.setLastUpdateDate(updateDate);
+    submission.setExternalJobId("job-x");
+    submission.setExternalLink("http://somewhere");
+    submission.getError().setErrorSummary("RuntimeException");
+    submission.getError().setErrorDetails("Yeah it happens");
+    submission.setCounters(counters);
+    submission.setFromConnectorContext(fromContext);
+    submission.setToConnectorContext(toContext);
+    submission.setDriverContext(driverContext);
+
+    handler.createSubmission(submission, provider.getConnection());
+
+    assertEquals(1, submission.getPersistenceId());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 1);
+
+    List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider
+        .getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+
+    submission = submissions.get(0);
+
+    assertEquals(1, submission.getJobId());
+    assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
+    long exceptedData = creationDate.getTime();
+    long actualData = submission.getCreationDate().getTime();
+    long diff = Math.abs(exceptedData - actualData);
+    // the date is saved as Timestamp, there maybe 1 sec diff between the set
+    // value and get value
+    assertTrue(diff < 2000);
+    exceptedData = updateDate.getTime();
+    actualData = submission.getLastUpdateDate().getTime();
+    diff = Math.abs(exceptedData - actualData);
+    assertTrue(diff < 2000);
+    assertEquals("job-x", submission.getExternalJobId());
+    assertEquals("http://somewhere", submission.getExternalLink());
+    assertEquals("RuntimeException", submission.getError().getErrorSummary());
+    assertEquals("Yeah it happens", submission.getError().getErrorDetails());
+
+    CounterGroup group;
+    Counter counter;
+    Counters retrievedCounters = submission.getCounters();
+    assertNotNull(retrievedCounters);
+
+    group = counters.getCounterGroup("ga");
+    assertNotNull(group);
+
+    counter = group.getCounter("ca");
+    assertNotNull(counter);
+    assertEquals(100, counter.getValue());
+
+    counter = group.getCounter("cb");
+    assertNotNull(counter);
+    assertEquals(200, counter.getValue());
+
+    group = counters.getCounterGroup("gb");
+    assertNotNull(group);
+
+    counter = group.getCounter("ca");
+    assertNotNull(counter);
+    assertEquals(300, counter.getValue());
+
+    counter = group.getCounter("cd");
+    assertNotNull(counter);
+    assertEquals(400, counter.getValue());
+
+    assertNotNull(submission.getFromConnectorContext());
+    assertNotNull(submission.getToConnectorContext());
+    assertNotNull(submission.getDriverContext());
+    assertEquals(submission.getFromConnectorContext().getString("from1"),
+        "value1");
+    assertEquals(submission.getFromConnectorContext().getString("from2"),
+        "value2");
+    assertEquals(submission.getToConnectorContext().getString("to1"), "value1");
+    assertEquals(submission.getToConnectorContext().getString("to2"), "value2");
+    assertEquals(submission.getDriverContext().getString("driver1"), "value1");
+    assertEquals(submission.getDriverContext().getString("driver2"), "value2");
+
+    // Let's create second (simpler) connection
+    submission = new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED,
+        "job-x");
+    handler.createSubmission(submission, provider.getConnection());
+
+    assertEquals(2, submission.getPersistenceId());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 2);
+  }
+
+  @Test
+  public void testUpdateSubmission() throws Exception {
+    loadSubmissions();
+
+    List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider
+        .getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+
+    MSubmission submission = submissions.get(0);
+    submission.setStatus(SubmissionStatus.SUCCEEDED);
+
+    handler.updateSubmission(submission, provider.getConnection());
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+  }
+
+  @Test
+  public void testCreateSubmissionExceptionDetailsMoreThanMaxLimit()
+      throws Exception {
+
+    String externalLink = "http://somewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresomewheresom"
+        + "ewheresomewheresomewheresomewheresomewher";
+
+    String errorSummary = "RuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptions"
+        + "RuntimeExceptionRuntimeExceptionRuntimeExceptiontests";
+    String errorDetail = "Yeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+        + " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it hap"
+        + "pensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYea"
+        + "h it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it ha"
+        + "ppensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+        + " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happe"
+        + "nsYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happens";
+    MSubmission submission = new MSubmission();
+    submission.setJobId(1);
+    submission.setStatus(SubmissionStatus.RUNNING);
+    submission.setCreationDate(new Date());
+    submission.setLastUpdateDate(new Date());
+    submission.setExternalJobId("job-x");
+    submission.setExternalLink(externalLink + "more than 150");
+    submission.getError().setErrorSummary("RuntimeException");
+    submission.getError().setErrorDetails(errorDetail + "morethan750");
+    submission.getError().setErrorSummary(errorSummary + "morethan150");
+
+    handler.createSubmission(submission, provider.getConnection());
+    List<MSubmission> submissions = handler.findSubmissionsForJob(1,
+        provider.getConnection());
+    assertNotNull(submissions);
+
+    assertEquals(errorDetail, submissions.get(0).getError().getErrorDetails());
+    assertEquals(errorSummary, submissions.get(0).getError().getErrorSummary());
+    assertEquals(externalLink, submissions.get(0).getExternalLink());
+
+  }
+
+  @Test
+  public void testUpdateSubmissionExceptionDetailsMoreThanMaxLimit()
+      throws Exception {
+    loadSubmissions();
+
+    List<MSubmission> submissions = handler.findUnfinishedSubmissions(provider
+        .getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+
+    String errorSummary = "RuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptionRuntimeExceptions"
+        + "RuntimeExceptionRuntimeExceptionRuntimeExceptiontests";
+
+    String errorDetail = "Yeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+        + " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it hap"
+        + "pensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYea"
+        + "h it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it ha"
+        + "ppensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah"
+        + " it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happe"
+        + "nsYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happensYeah it happens";
+    MSubmission submission = submissions.get(0);
+    String externalLink = submission.getExternalLink();
+    submission.getError().setErrorDetails(errorDetail + "morethan750");
+    submission.getError().setErrorSummary(errorSummary + "morethan150");
+    submission.setExternalLink("cantupdate");
+
+    handler.updateSubmission(submission, provider.getConnection());
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+
+    assertNotNull(submissions);
+    assertEquals(errorDetail, submissions.get(0).getError().getErrorDetails());
+    assertEquals(errorSummary, submissions.get(0).getError().getErrorSummary());
+    // note we dont allow external link update
+    assertEquals(externalLink, submissions.get(0).getExternalLink());
+
+  }
+
+  @Test
+  public void testPurgeSubmissions() throws Exception {
+    loadSubmissions();
+    List<MSubmission> submissions;
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 4);
+
+    Calendar calendar = Calendar.getInstance();
+    // 2012-01-03 05:05:05
+    calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
+    handler.purgeSubmissions(calendar.getTime(), provider.getConnection());
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 4);
+
+    calendar.setTime(new Date());
+    calendar.add(Calendar.DATE, 1);
+    handler.purgeSubmissions(calendar.getTime(), provider.getConnection());
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 0);
+
+    handler.purgeSubmissions(new Date(), provider.getConnection());
+
+    submissions = handler.findUnfinishedSubmissions(provider.getConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 0);
+  }
+
+  /**
+   * Test that by directly removing jobs we will also remove associated
+   * submissions and counters.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDeleteJobs() throws Exception {
+    MJob jobA = handler.findJob(JOB_A_NAME, provider.getConnection());
+    MJob jobB = handler.findJob(JOB_B_NAME, provider.getConnection());
+
+    loadSubmissions();
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 4);
+
+    handler.deleteJob(jobA.getPersistenceId(), provider.getConnection());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 2);
+
+    handler.deleteJob(jobB.getPersistenceId(), provider.getConnection());
+    Assert.assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_SUBMISSION")), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index aabefc0..59663fa 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -74,6 +74,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop.repository</groupId>
+      <artifactId>sqoop-repository-mysql</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-generic-jdbc</artifactId>
     </dependency>


[2/2] sqoop git commit: SQOOP-2461: Sqoop2: Add MySQL support for the metadata repository

Posted by ab...@apache.org.
SQOOP-2461: Sqoop2: Add MySQL support for the metadata repository

(Colin Ma via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/668703cf
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/668703cf
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/668703cf

Branch: refs/heads/sqoop2
Commit: 668703cfeb204640fa76aabdb406c46920a9fe27
Parents: 1c24ecb
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Aug 19 18:40:29 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Aug 19 18:40:29 2015 -0700

----------------------------------------------------------------------
 .../sqoop/common/test/db/DatabaseProvider.java  |   9 +
 .../sqoop/common/test/db/MySQLProvider.java     |  29 +-
 .../apache/sqoop/error/code/MySqlRepoError.java |  49 +++
 pom.xml                                         |   5 +
 repository/pom.xml                              |   1 +
 .../repository/common/CommonRepoUtils.java      |   4 +
 repository/repository-mysql/pom.xml             | 124 ++++++
 .../repository/mysql/MySqlRepoConstants.java    |  34 ++
 .../mysql/MySqlRepositoryHandler.java           | 228 +++++++++++
 .../mysql/MySqlSchemaCreateQuery.java           | 297 ++++++++++++++
 .../repository/mysql/MySqlSchemaQuery.java      |  47 +++
 ...RepositoryInsertUpdateDeleteSelectQuery.java |  59 +++
 .../repository/mysql/MySqlTestCase.java         | 186 +++++++++
 .../repository/mysql/MySqlTestUtils.java        | 105 +++++
 .../repository/mysql/TestConnectorHandling.java | 166 ++++++++
 .../repository/mysql/TestDriverHandling.java    |  89 ++++
 .../repository/mysql/TestHandler.java           |  39 ++
 .../repository/mysql/TestJobHandling.java       | 302 ++++++++++++++
 .../repository/mysql/TestLinkHandling.java      | 297 ++++++++++++++
 .../repository/mysql/TestStructure.java         |  81 ++++
 .../mysql/TestSubmissionHandling.java           | 406 +++++++++++++++++++
 server/pom.xml                                  |   5 +
 22 files changed, 2560 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
index a6ae490..4f4d347 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
@@ -467,4 +467,13 @@ abstract public class DatabaseProvider {
 
     return escapeSchemaName(tableName.getSchemaName()) + "." + escapeTableName(tableName.getTableName());
   }
+
+  /**
+   * Drop database, this should be implemented for the DatabaseProvider like:
+   * MySqlProvider.
+   *
+   * @param databaseName
+   */
+  public void dropDatabase(String databaseName) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
index 3083ee6..cef59bb 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/MySQLProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.common.test.db;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.test.db.types.DatabaseTypeList;
 import org.apache.sqoop.common.test.db.types.MySQLTypeList;
 
@@ -27,6 +28,7 @@ import org.apache.sqoop.common.test.db.types.MySQLTypeList;
  * on the same box (localhost) that is access via sqoop/sqoop credentials.
  */
 public class MySQLProvider extends DatabaseProvider {
+  private static final Logger LOG = Logger.getLogger(MySQLProvider.class);
 
   public static final String DRIVER = "com.mysql.jdbc.Driver";
 
@@ -70,9 +72,19 @@ public class MySQLProvider extends DatabaseProvider {
     return escape(tableName);
   }
 
+  public String escapeDatabaseName(String databaseName) {
+    return escape(databaseName);
+  }
+
+  // the scheme name is the same as database name.
+  @Override
+  public boolean isSupportingScheme() {
+    return true;
+  }
+
   @Override
   public String escapeValueString(String value) {
-    return "\"" + value + "\"";
+    return escape(value);
   }
 
   @Override
@@ -84,7 +96,20 @@ public class MySQLProvider extends DatabaseProvider {
   public DatabaseTypeList getDatabaseTypes() {
     return new MySQLTypeList();
   }
+
+  @Override
+  public void dropDatabase(String databaseName) {
+    StringBuilder sb = new StringBuilder("DROP DATABASE ");
+    sb.append(escapeDatabaseName(databaseName));
+
+    try {
+      executeUpdate(sb.toString());
+    } catch (RuntimeException e) {
+      LOG.info("Ignoring exception: " + e);
+    }
+  }
+
   public String escape(String entity) {
-    return "`" + entity + "`";
+    return "\"" + entity + "\"";
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java b/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
new file mode 100644
index 0000000..399f741
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/error/code/MySqlRepoError.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.error.code;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum MySqlRepoError implements ErrorCode {
+
+  /** An unknown error has occurred. */
+  MYSQLREPO_0000("An unknown error has occurred"),
+
+  /** The MySQL Repository handler was unable to add directions. */
+  MYSQLREPO_0001("Could not add directions"),
+
+  /** The system was unable to get ID of recently added direction. */
+  MYSQLREPO_0002("Could not get ID of recently added direction"),
+
+  ;
+
+  private final String message;
+
+  private MySqlRepoError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b1dbdc3..25ad00c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,6 +304,11 @@ limitations under the License.
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.sqoop.repository</groupId>
+        <artifactId>sqoop-repository-mysql</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.sqoop</groupId>
         <artifactId>connector-sdk</artifactId>
         <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index c63595c..6a9fbfd 100644
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
     <module>repository-common</module>
     <module>repository-derby</module>
     <module>repository-postgresql</module>
+    <module>repository-mysql</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
index 73293c0..df41fb1 100644
--- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepoUtils.java
@@ -39,6 +39,10 @@ public class CommonRepoUtils {
     return QUOTE_CHARACTER + schemaName + QUOTE_CHARACTER;
   }
 
+  public static final String escapeDatabaseName(String databaseName) {
+    return QUOTE_CHARACTER + databaseName + QUOTE_CHARACTER;
+  }
+
   public static final String escapeConstraintName(String constraintName) {
     return QUOTE_CHARACTER + constraintName + QUOTE_CHARACTER;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/pom.xml b/repository/repository-mysql/pom.xml
new file mode 100644
index 0000000..15e909e
--- /dev/null
+++ b/repository/repository-mysql/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>repository</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.repository</groupId>
+  <artifactId>sqoop-repository-mysql</artifactId>
+  <name>Sqoop MySQL Repository</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+    </dependency>
+	<dependency>
+      <groupId>org.apache.sqoop.repository</groupId>
+      <artifactId>sqoop-repository-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludedGroups>mysql</excludedGroups>
+          <excludes>
+            <exclude>**/integration/**</exclude>
+          </excludes>
+          <systemPropertyVariables>
+            <sqoop.integration.tmpdir>${project.build.directory}</sqoop.integration.tmpdir>
+          </systemPropertyVariables>
+        </configuration>
+        <executions>
+          <execution>
+            <id>integration-test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <phase>integration-test</phase>
+            <configuration>
+              <excludes>
+                <exclude>none</exclude>
+              </excludes>
+              <includes>
+                <include>**/integration/**</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>mysql</id>
+      <activation>
+        <property>
+          <name>mysql</name>
+        </property>
+      </activation>
+
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <groups>mysql</groups>
+              <excludedGroups>none</excludedGroups>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
new file mode 100644
index 0000000..c5fd241
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepoConstants.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+public class MySqlRepoConstants {
+  public static final String CONF_PREFIX_MYSQL = "mysql.";
+
+  /**
+   * Expected version of the repository structures.
+   *
+   * History:
+   * 1 - Version
+   */
+  public static final int LATEST_MYSQL_REPOSITORY_VERSION = 1;
+
+  private MySqlRepoConstants() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
new file mode 100644
index 0000000..61b058a
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlRepositoryHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.error.code.MySqlRepoError;
+import org.apache.sqoop.repository.JdbcRepositoryContext;
+import org.apache.sqoop.repository.common.CommonRepoConstants;
+import org.apache.sqoop.repository.common.CommonRepositoryHandler;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+/**
+ * JDBC based repository handler for MySQL database.
+ *
+ * Repository implementation for MySQL database.
+ */
+public class MySqlRepositoryHandler extends CommonRepositoryHandler {
+
+  private static final Logger LOG =
+      Logger.getLogger(MySqlRepositoryHandler.class);
+
+  private JdbcRepositoryContext repoContext;
+
+  public MySqlRepositoryHandler() {
+    crudQueries = new MysqlRepositoryInsertUpdateDeleteSelectQuery();
+  }
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String name() {
+    return "MySql";
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void initialize(JdbcRepositoryContext ctx) {
+    repoContext = ctx;
+    LOG.info("MySqlRepositoryHandler initialized.");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void shutdown() {
+  }
+
+  /**
+   * Detect version of underlying database structures
+   *
+   * @param conn
+   *          JDBC Connection
+   * @return
+   */
+  public int detectRepositoryVersion(Connection conn) {
+    ResultSet rs = null, metadataResultSet = null;
+    PreparedStatement stmt = null;
+
+    // Select and return the version
+    try {
+      DatabaseMetaData md = conn.getMetaData();
+      metadataResultSet = md.getTables(null,
+          CommonRepositorySchemaConstants.SCHEMA_SQOOP,
+          CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME, null);
+
+      if (metadataResultSet.next()) {
+        stmt = conn.prepareStatement(MySqlSchemaQuery.STMT_SELECT_SYSTEM);
+        stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
+        rs = stmt.executeQuery();
+
+        if (!rs.next()) {
+          return 0;
+        }
+
+        return rs.getInt(1);
+      }
+    } catch (SQLException e) {
+      LOG.info("Can't fetch repository structure version.", e);
+      return 0;
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+
+    return 0;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createOrUpgradeRepository(Connection conn) {
+
+    int version = detectRepositoryVersion(conn);
+    LOG.info("Detected repository version: " + version);
+
+    if (version == MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION) {
+      return;
+    }
+
+    if (version < 1) {
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_DATABASE_SQOOP, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIGURABLE, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_INPUT_RELATION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY, conn);
+      runQuery(MySqlSchemaCreateQuery.QUERY_CREATE_TABLE_SQ_CONTEXT, conn);
+
+      // Insert FROM and TO directions.
+      insertDirections(conn);
+    }
+
+    ResultSet rs = null;
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn
+          .prepareStatement(MySqlSchemaQuery.STMT_INSERT_ON_DUPLICATE_KEY_SYSTEM);
+      stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
+      stmt.setString(2,
+          Integer.toString(MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION));
+      stmt.setString(3,
+          Integer.toString(MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION));
+      stmt.executeUpdate();
+    } catch (SQLException e) {
+      LOG.error("Can't persist the repository version", e);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Insert directions: FROM and TO.
+   *
+   * @param conn
+   * @return Map<Direction, Long> direction ID => Direction
+   */
+  protected Map<Direction, Long> insertDirections(Connection conn) {
+    // Add directions
+    Map<Direction, Long> directionMap = new TreeMap<Direction, Long>();
+    PreparedStatement insertDirectionStmt = null;
+    try {
+      // Insert directions and get IDs.
+      for (Direction direction : Direction.values()) {
+        insertDirectionStmt = conn.prepareStatement(
+            MySqlSchemaQuery.STMT_INSERT_DIRECTION,
+            Statement.RETURN_GENERATED_KEYS);
+        insertDirectionStmt.setString(1, direction.toString());
+        if (insertDirectionStmt.executeUpdate() != 1) {
+          throw new SqoopException(MySqlRepoError.MYSQLREPO_0001,
+              "Could not add directions FROM and TO.");
+        }
+
+        ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
+        if (directionId.next()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Loaded direction: " + directionId.getLong(1));
+          }
+
+          directionMap.put(direction, directionId.getLong(1));
+        } else {
+          throw new SqoopException(MySqlRepoError.MYSQLREPO_0002,
+              "Could not get ID of direction " + direction);
+        }
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(MySqlRepoError.MYSQLREPO_0000, e);
+    } finally {
+      closeStatements(insertDirectionStmt);
+    }
+
+    return directionMap;
+  }
+
+  @Override
+  public boolean isRepositorySuitableForUse(Connection conn) {
+    int version = detectRepositoryVersion(conn);
+
+    if (version != MySqlRepoConstants.LATEST_MYSQL_REPOSITORY_VERSION) {
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
new file mode 100644
index 0000000..46493a3
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaCreateQuery.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.SCHEMA_SQOOP;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+public class MySqlSchemaCreateQuery {
+
+  public static final String QUERY_CREATE_DATABASE_SQOOP = "CREATE DATABASE " + CommonRepoUtils.escapeDatabaseName(SCHEMA_SQOOP);
+
+  public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_KEY) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQM_VALUE) + " VARCHAR(64) "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_NAME) + " VARCHAR(64)"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIGURABLE =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_NAME) + " VARCHAR(64),"
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_CLASS) + " VARCHAR(255), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_VERSION) + " VARCHAR(64), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CONFIGURABLE_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_NAME) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONNECTOR_DIRECTIONS_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_CONNECTOR) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_DIRECTION) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCD_SQC_NAME)
+          + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_CONNECTOR) + ") REFERENCES "
+          + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME) + "("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCD_SQD_NAME)
+          + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCD_DIRECTION) + ") REFERENCES "
+          + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME) + "("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_INDEX) + " SMALLINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_SQC_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CONFIG_UNIQUE_NAME_TYPE_CONFIGURABLE)
+            + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_NAME) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_TYPE) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_CONFIGURABLE) + ") "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_DIRECTIONS_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_CONFIG) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_DIRECTION) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_CONFIG) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_DIR_DIRECTION) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQD_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_INDEX) + " SMALLINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_TYPE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_STRMASK) + " BOOLEAN, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_STRLENGTH) + " SMALLINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_EDITABLE) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ENUMVALS) + " VARCHAR(100), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQI_SQ_CFG_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIG_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_CFG_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_INPUT_UNIQUE_NAME_TYPE_CONFIG)
+            + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_NAME) + ", "
+                          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_TYPE) + ", "
+                          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG) + ") "
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_INPUT_RELATION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_RELATION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_PARENT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQIR_CHILD) + " BIGINT, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQIR_PARENT_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQIR_PARENT + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQI_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQIR_CHILD_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQIR_CHILD + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQI_ID + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_LINK =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CONFIGURABLE) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_NAME) + " VARCHAR(32), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ENABLED) + " BOOLEAN DEFAULT TRUE, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_NAME) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNK_SQC_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_CONFIGURABLE) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONFIGURABLE_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQC_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_JOB =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_FROM_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_TO_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_NAME) + " VARCHAR(64), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ENABLED) + " BOOLEAN DEFAULT TRUE, "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_NAME_UNIQUE_NAME)
+          + " UNIQUE (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_NAME) + "),"
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_SQ_LNK_FROM_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_FROM_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQB_SQ_LNK_TO_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_TO_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_VALUE) + " VARCHAR(1000), "
+          + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNKI_SQ_LNK_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_LINK) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_LINK_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNK_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQ_LNKI_SQI_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_INPUT) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_INPUT_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_VALUE) + " VARCHAR(1000), "
+          + "PRIMARY KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + ", "
+            + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQBI_SQB_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_JOB) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQBI_SQI_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQBI_INPUT) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQI_ID) + ")"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_JOB) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_STATUS) + " VARCHAR(20), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_CREATION_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_CREATION_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_UPDATE_DATE) + " TIMESTAMP, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_UPDATE_USER) + " VARCHAR(32) DEFAULT NULL, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_EXTERNAL_ID) + " VARCHAR(50), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_EXTERNAL_LINK) + " VARCHAR(150), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ERROR_SUMMARY) + " VARCHAR(150), "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ERROR_DETAILS) + " VARCHAR(750), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQS_SQB_NAME)
+            + " FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_JOB) + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_JOB_NAME)
+              + "("  + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQB_ID) + ") ON DELETE CASCADE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_GROUP_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQG_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQG_NAME) + " VARCHAR(75) UNIQUE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQR_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQR_NAME) + " VARCHAR(75) UNIQUE"
+          + ")";
+
+  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
+      "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_SUBMISSION_NAME) + " ("
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION) + " BIGINT, "
+          + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQRS_VALUE) + " BIGINT, "
+          + "PRIMARY KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP + ", " + CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER + ", " + CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQG_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_GROUP + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_GROUP_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQG_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQR_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_COUNTER + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_COUNTER_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQR_ID + "), "
+          + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQRS_SQS_NAME)
+            + " FOREIGN KEY (" + CommonRepositorySchemaConstants.COLUMN_SQRS_SUBMISSION + ") REFERENCES "
+              + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME) + "("  + CommonRepositorySchemaConstants.COLUMN_SQS_ID + ") ON DELETE CASCADE"
+          + ")";
+
+   // DDL: Create table SQ_CONTEXT_TYPE
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_TYPE =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE) + " ("
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_NAME) + " VARCHAR(25) UNIQUE"
+           + ")";
+
+   // DDL: Create table SQ_CONTEXT_PROPERTY
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT_PROPERTY =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY) + " ("
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+           + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_NAME) + " VARCHAR(500) UNIQUE"
+           + ")";
+
+   // DDL: Create table SQ_CONTEXT
+   public static final String QUERY_CREATE_TABLE_SQ_CONTEXT =
+       "CREATE TABLE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT) + " ("
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_ID) + " BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + " BIGINT, "
+       + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_VALUE) + " VARCHAR(500), "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQS_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_SUBMISSION) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SUBMISSION_NAME)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQS_ID) + ") ON DELETE CASCADE, "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCT_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_TYPE) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_TYPE)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCT_ID) + "), "
+       + "CONSTRAINT " + CommonRepoUtils.escapeConstraintName(CommonRepositorySchemaConstants.CONSTRAINT_SQCO_SQCP_ID) + " "
+         + "FOREIGN KEY (" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCO_PROPERTY) + ") "
+           + "REFERENCES " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_CONTEXT_PROPERTY)
+           + "(" + CommonRepoUtils.escapeColumnName(CommonRepositorySchemaConstants.COLUMN_SQCP_ID) + ") "
+       + ")";
+
+  private MySqlSchemaCreateQuery() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
new file mode 100644
index 0000000..7394a83
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MySqlSchemaQuery.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+
+/**
+ * DML for MySql repository.
+ */
+public final class MySqlSchemaQuery {
+
+  public static final String STMT_SELECT_SYSTEM =
+      "SELECT "
+          + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE
+          + " FROM " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME)
+          + " WHERE " + CommonRepositorySchemaConstants.COLUMN_SQM_KEY + " = ?";
+
+  public static final String STMT_INSERT_ON_DUPLICATE_KEY_SYSTEM =
+      "INSERT INTO " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_SYSTEM_NAME) + "("
+          + CommonRepositorySchemaConstants.COLUMN_SQM_KEY + ", "
+          + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE + ") "
+          + "VALUES(?, ?) ON DUPLICATE KEY UPDATE " + CommonRepositorySchemaConstants.COLUMN_SQM_VALUE + " = ?";
+
+  public static final String STMT_INSERT_DIRECTION =
+      "INSERT INTO " + CommonRepoUtils.getTableName(CommonRepositorySchemaConstants.SCHEMA_SQOOP, CommonRepositorySchemaConstants.TABLE_SQ_DIRECTION_NAME)
+          + " (" + CommonRepositorySchemaConstants.COLUMN_SQD_NAME+ ") VALUES (?)";
+
+  private MySqlSchemaQuery() {
+    // disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
new file mode 100644
index 0000000..4c295c0
--- /dev/null
+++ b/repository/repository-mysql/src/main/java/org/apache/sqoop/repository/mysql/MysqlRepositoryInsertUpdateDeleteSelectQuery.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.mysql;
+
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_CONFIG;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_EDITABLE;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_ENUMVALS;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_ID;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_INDEX;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_NAME;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_STRLENGTH;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_STRMASK;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQI_TYPE;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.SCHEMA_SQOOP;
+import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.TABLE_SQ_INPUT_NAME;
+
+import org.apache.sqoop.repository.common.CommonRepoUtils;
+import org.apache.sqoop.repository.common.CommonRepositoryInsertUpdateDeleteSelectQuery;
+
+public class MysqlRepositoryInsertUpdateDeleteSelectQuery extends
+    CommonRepositoryInsertUpdateDeleteSelectQuery {
+
+  // DML: Get inputs for a given config
+  private static final String STMT_SELECT_INPUT = "SELECT "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ID) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_NAME) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_INDEX) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_TYPE) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_STRMASK) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_STRLENGTH) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_EDITABLE) + ", "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ENUMVALS) + ", "
+      + "cast(null as char(100))" + " FROM "
+      + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_INPUT_NAME)
+      + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG)
+      + " = ?" + " ORDER BY "
+      + CommonRepoUtils.escapeColumnName(COLUMN_SQI_INDEX);
+
+  @Override
+  public String getStmtSelectInput() {
+    return STMT_SELECT_INPUT;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
new file mode 100644
index 0000000..0bb3c63
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestCase.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.test.db.DatabaseProvider;
+import org.apache.sqoop.common.test.db.MySQLProvider;
+import org.apache.sqoop.json.DriverBean;
+import org.apache.sqoop.model.InputEditable;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.MToConfig;
+import org.apache.sqoop.repository.common.CommonRepositorySchemaConstants;
+import org.apache.sqoop.repository.mysql.MySqlRepositoryHandler;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Abstract class with convenience methods for testing mysql repository.
+ */
+abstract public class MySqlTestCase extends TestCase {
+
+  public static DatabaseProvider provider;
+  public static MySqlTestUtils utils;
+  public MySqlRepositoryHandler handler;
+
+  @BeforeClass
+  public void setUpClass() {
+    provider = new MySQLProvider();
+    utils = new MySqlTestUtils(provider);
+  }
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp() throws Exception {
+    provider.start();
+
+    handler = new MySqlRepositoryHandler();
+    handler.createOrUpgradeRepository(provider.getConnection());
+    utils.setDatabase(CommonRepositorySchemaConstants.SCHEMA_SQOOP);
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void tearDown() throws Exception {
+    provider.dropDatabase("SQOOP");
+    provider.stop();
+  }
+
+  protected MConnector getConnector(String name, String className,
+      String version, boolean from, boolean to) {
+    return new MConnector(name, className, version, getLinkConfig(),
+        from ? getFromConfig() : null, to ? getToConfig() : null);
+  }
+
+  protected MDriver getDriver() {
+    return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION);
+  }
+
+  protected MLink getLink(String name, MConnector connector) {
+    MLink link = new MLink(connector.getPersistenceId(),
+        connector.getLinkConfig());
+    link.setName(name);
+    fillLink(link);
+    return link;
+  }
+
+  protected MJob getJob(String name, MConnector connectorA,
+      MConnector connectorB, MLink linkA, MLink linkB) {
+    MDriver driver = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    MJob job = new MJob(connectorA.getPersistenceId(),
+        connectorB.getPersistenceId(), linkA.getPersistenceId(),
+        linkB.getPersistenceId(), connectorA.getFromConfig(),
+        connectorB.getToConfig(), driver.getDriverConfig());
+    job.setName(name);
+    fillJob(job);
+
+    return job;
+  }
+
+  protected MSubmission getSubmission(MJob job,
+      SubmissionStatus submissionStatus) {
+    MSubmission submission = new MSubmission(job.getPersistenceId(),
+        new Date(), submissionStatus);
+    fillSubmission(submission);
+    return submission;
+  }
+
+  protected void fillLink(MLink link) {
+    List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+  }
+
+  protected void fillJob(MJob job) {
+    List<MConfig> configs = job.getFromJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+
+    configs = job.getToJobConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+
+    configs = job.getDriverConfig().getConfigs();
+    ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
+    ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
+  }
+
+  protected void fillSubmission(MSubmission submission) {
+    Counters counters = new Counters();
+    counters.addCounterGroup(new CounterGroup("test-1"));
+    counters.addCounterGroup(new CounterGroup("test-2"));
+    submission.setCounters(counters);
+  }
+
+  protected MLinkConfig getLinkConfig() {
+    return new MLinkConfig(getConfigs("l1", "l2"));
+  }
+
+  protected MFromConfig getFromConfig() {
+    return new MFromConfig(getConfigs("from1", "from2"));
+  }
+
+  protected MToConfig getToConfig() {
+    return new MToConfig(getConfigs("to1", "to2"));
+  }
+
+  protected MDriverConfig getDriverConfig() {
+    return new MDriverConfig(getConfigs("d1", "d2"));
+  }
+
+  protected List<MConfig> getConfigs(String configName1, String configName2) {
+    List<MConfig> configs = new LinkedList<MConfig>();
+
+    List<MInput<?>> inputs = new LinkedList<MInput<?>>();
+    MInput<?> input = new MStringInput("I1", false, InputEditable.ANY,
+        StringUtils.EMPTY, (short) 30);
+    inputs.add(input);
+    input = new MMapInput("I2", false, InputEditable.ANY, "I1");
+    inputs.add(input);
+    configs.add(new MConfig(configName1, inputs));
+
+    inputs = new LinkedList<MInput<?>>();
+    input = new MStringInput("I3", false, InputEditable.ANY, "I4", (short) 30);
+    inputs.add(input);
+    input = new MMapInput("I4", false, InputEditable.ANY, "I3");
+    inputs.add(input);
+    configs.add(new MConfig(configName2, inputs));
+
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
new file mode 100644
index 0000000..3a16135
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/MySqlTestUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.sqoop.common.test.db.DatabaseProvider;
+
+public class MySqlTestUtils {
+
+  private DatabaseProvider provider;
+
+  public MySqlTestUtils(DatabaseProvider provider) {
+    this.provider = provider;
+  }
+
+  public void setDatabase(String database) throws Exception {
+    provider.getConnection().setCatalog(database);
+  }
+
+  public void assertTableExists(String database, String table) throws Exception {
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getTables(null, database, table, null);
+    while (rs.next()) {
+      if (rs.getString(3).equals(table)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find table '" + table
+        + "' part of database '" + database + "'");
+  }
+
+  public void assertForeignKey(String database, String table, String column,
+      String foreignKeyTable, String foreignKeyColumn) throws Exception {
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getCrossReference(null, database, table, null, database,
+        foreignKeyTable);
+    while (rs.next()) {
+      if (rs.getString(4).equals(column)
+          && rs.getString(8).equals(foreignKeyColumn)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find '" + table + "." + column
+        + "' part of database '" + database + "' with reference to '" + table
+        + "." + column + "'");
+  }
+
+  public void assertUniqueConstraints(String database, String table,
+      String... columns) throws Exception {
+    Set<String> columnSet = new TreeSet<String>();
+    Map<String, Set<String>> indexColumnMap = new HashMap<String, Set<String>>();
+
+    for (String column : columns) {
+      columnSet.add(column);
+    }
+
+    DatabaseMetaData md = provider.getConnection().getMetaData();
+    ResultSet rs = md.getIndexInfo(null, database, table, true, false);
+
+    // Get map of index => columns
+    while (rs.next()) {
+      String indexName = rs.getString(6);
+      String columnName = rs.getString(9);
+      if (!indexColumnMap.containsKey(indexName)) {
+        indexColumnMap.put(indexName, new TreeSet<String>());
+      }
+      indexColumnMap.get(indexName).add(columnName);
+    }
+
+    // Validate unique constraints
+    for (String index : indexColumnMap.keySet()) {
+      if (indexColumnMap.get(index).equals(columnSet)) {
+        return;
+      }
+    }
+
+    throw new AssertionError("Could not find unique constraint on table '"
+        + table + "' part of database '" + database
+        + "' with reference to columns '" + columnSet + "'");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
new file mode 100644
index 0000000..8e1b3d1
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestConnectorHandling.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.repository.mysql;
+
+import java.util.List;
+
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MConnector;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestConnectorHandling extends MySqlTestCase {
+
+  @Test
+  public void testFindConnector() throws Exception {
+    // On empty repository, no connectors should be there
+    assertNull(handler.findConnector("A", provider.getConnection()));
+
+    // Register a single connector
+    handler.registerConnector(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        provider.getConnection());
+
+    // Retrieve it and compare with original
+    MConnector connector = handler.findConnector("A", provider.getConnection());
+    assertNotNull(connector);
+    assertEquals(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        connector);
+  }
+
+  @Test
+  public void testFindAllConnectors() throws Exception {
+    // No connectors in an empty repository, we expect an empty list
+    assertEquals(handler.findConnectors(provider.getConnection()).size(), 0);
+
+    // Register connectors
+    handler.registerConnector(
+        getConnector("A", "org.apache.sqoop.test.A", "1.0-test", true, true),
+        provider.getConnection());
+    handler.registerConnector(
+        getConnector("B", "org.apache.sqoop.test.B", "1.0-test", true, true),
+        provider.getConnection());
+
+    // loadConfigurables();
+    // Retrieve connectors
+    List<MConnector> connectors = handler.findConnectors(provider
+        .getConnection());
+    assertNotNull(connectors);
+    assertEquals(connectors.size(), 2);
+    assertEquals(connectors.get(0).getUniqueName(), "A");
+    assertEquals(connectors.get(1).getUniqueName(), "B");
+  }
+
+  @Test
+  public void testRegisterConnector() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", true, true);
+    handler.registerConnector(connector, provider.getConnection());
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 6);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 12);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 9);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testFromDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", true, false);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 4);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 8);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 6);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testToDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", false, true);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 4);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 8);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 6);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+
+  @Test
+  public void testNeitherDirection() throws Exception {
+    MConnector connector = getConnector("A", "org.apache.sqoop.test.A",
+        "1.0-test", false, false);
+
+    handler.registerConnector(connector, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, connector.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 2);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 4);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 3);
+
+    // Registered connector should be easily recovered back
+    MConnector retrieved = handler.findConnector("A", provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(connector, retrieved);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
new file mode 100644
index 0000000..c3144ef
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestDriverHandling.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import org.apache.sqoop.common.test.db.TableName;
+import org.apache.sqoop.model.MDriver;
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestDriverHandling extends MySqlTestCase {
+
+  private static final Object CURRENT_DRIVER_VERSION = "1";
+
+  @Test
+  public void testFindDriver() throws Exception {
+    // On empty repository, no driverConfig should be there
+    assertNull(handler
+        .findDriver(MDriver.DRIVER_NAME, provider.getConnection()));
+
+    // Register driver
+    handler.registerDriver(getDriver(), provider.getConnection());
+
+    // Retrieve it
+    MDriver driver = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    assertNotNull(driver);
+    assertNotNull(driver.getDriverConfig());
+    assertEquals("1", driver.getVersion());
+    assertEquals("1", driver.getVersion());
+
+    // Compare with original
+    assertEquals(getDriver().getDriverConfig(), driver.getDriverConfig());
+  }
+
+  @Test
+  public void testRegisterDriver() throws Exception {
+    MDriver driver = getDriver();
+    handler.registerDriver(driver, provider.getConnection());
+
+    // Connector should get persistence ID
+    assertEquals(1, driver.getPersistenceId());
+
+    // Now check content in corresponding tables
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIGURABLE")),
+        1);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_CONFIG")), 2);
+    assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_INPUT")), 4);
+    assertEquals(
+        provider.rowCount(new TableName("SQOOP", "SQ_INPUT_RELATION")), 3);
+
+    // Registered driver and config should be easily recovered back
+    MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection());
+    assertNotNull(retrieved);
+    assertEquals(driver, retrieved);
+    assertEquals(driver.getVersion(), retrieved.getVersion());
+  }
+
+  @Test
+  public void testDriverVersionUpgrade() throws Exception {
+    MDriver driver = getDriver();
+    handler.registerDriver(driver, provider.getConnection());
+    String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME,
+        provider.getConnection()).getVersion();
+    assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion);
+    driver.setVersion("2");
+    handler.upgradeDriverAndConfigs(driver, provider.getConnection());
+    assertEquals("2", driver.getVersion());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/668703cf/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
new file mode 100644
index 0000000..bfa53bd
--- /dev/null
+++ b/repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.integration.repository.mysql;
+
+import org.testng.annotations.Test;
+
+/**
+ * Test driver methods on MySql repository.
+ */
+@Test(groups = "mysql")
+public class TestHandler extends MySqlTestCase {
+
+  @Test
+  public void testHasLatestRepositoryVersion() throws Exception {
+    assertTrue(handler.isRepositorySuitableForUse(provider.getConnection()));
+  }
+
+  @Test
+  public void testDoubleUpdate() throws Exception {
+    handler.createOrUpgradeRepository(provider.getConnection());
+    assertTrue(handler.isRepositorySuitableForUse(provider.getConnection()));
+  }
+}