You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:12 UTC

[20/50] [abbrv] SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestJobManager.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestJobManager.java b/core/src/test/java/org/apache/sqoop/framework/TestJobManager.java
new file mode 100644
index 0000000..2732b1c
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/framework/TestJobManager.java
@@ -0,0 +1,173 @@
+package org.apache.sqoop.framework;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.repository.JdbcRepository;
+import org.apache.sqoop.repository.Repository;
+import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.request.HttpEventContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJobManager {
+  private JobManager jobManager;
+  private SqoopConnector sqoopConnectorMock;
+  private ConnectorManager connectorMgrMock;
+  private RepositoryManager repositoryManagerMock;
+  private Repository jdbcRepoMock;
+
+  @Before
+  public void setUp() {
+    jobManager = new JobManager();
+    connectorMgrMock = mock(ConnectorManager.class);
+    sqoopConnectorMock = mock(SqoopConnector.class);
+    ConnectorManager.setInstance(connectorMgrMock);
+    repositoryManagerMock = mock(RepositoryManager.class);
+    RepositoryManager.setInstance(repositoryManagerMock);
+    jdbcRepoMock = mock(JdbcRepository.class);
+  }
+
+  @Test
+  public void testCreateJobSubmission() {
+
+    HttpEventContext testCtx = new HttpEventContext();
+    testCtx.setUsername("testUser");
+    MSubmission jobSubmission = jobManager.createJobSubmission(testCtx, 1234L);
+    assertEquals(jobSubmission.getCreationUser(), "testUser");
+    assertEquals(jobSubmission.getLastUpdateUser(), "testUser");
+  }
+
+  @Test
+  public void testGetConnector() {
+    when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock);
+    when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections());
+    assertEquals(jobManager.getConnector(123l), sqoopConnectorMock);
+    verify(connectorMgrMock, times(1)).getConnector(123l);
+  }
+
+  @Test
+  public void testUnsupportedDirectionForConnector() {
+    // invalid job id/ direction
+    SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
+        + sqoopConnectorMock.getClass().getCanonicalName());
+    List<Direction> supportedDirections = getSupportedDirections();
+    when(sqoopConnectorMock.getSupportedDirections()).thenReturn(supportedDirections);
+
+    try {
+      // invalid direction
+      jobManager.validateSupportedDirection(sqoopConnectorMock, null);
+    } catch (SqoopException ex) {
+      assertEquals(ex.getMessage(), exception.getMessage());
+      verify(sqoopConnectorMock, times(1)).getSupportedDirections();
+      return;
+    }
+
+    fail("Should throw out an exception with message: " + exception.getMessage());
+  }
+
+  @Test
+  public void testGetConnection() {
+    MConnection testConnection = new MConnection(123l, null, null);
+    testConnection.setEnabled(true);
+    MConnection mConnectionSpy = org.mockito.Mockito.spy(testConnection);
+    when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
+    when(jdbcRepoMock.findConnection(123l)).thenReturn(mConnectionSpy);
+    assertEquals(jobManager.getConnection(123l), mConnectionSpy);
+    verify(repositoryManagerMock, times(1)).getRepository();
+    verify(jdbcRepoMock, times(1)).findConnection(123l);
+  }
+
+  @Test
+  public void testDisabledConnection() {
+    MConnection testConnection = new MConnection(123l, null, null);
+    testConnection.setPersistenceId(1234);
+    testConnection.setEnabled(false);
+    SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
+        + testConnection.getPersistenceId());
+
+    MConnection mConnectionSpy = org.mockito.Mockito.spy(testConnection);
+    when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
+    when(jdbcRepoMock.findConnection(123l)).thenReturn(mConnectionSpy);
+    try {
+      jobManager.getConnection(123l);
+    } catch (SqoopException ex) {
+      assertEquals(ex.getMessage(), exception.getMessage());
+      verify(repositoryManagerMock, times(1)).getRepository();
+      verify(jdbcRepoMock, times(1)).findConnection(123l);
+    }
+  }
+
+  @Test
+  public void testGetJob() {
+    MJob testJob = job(123l, 456l);
+    testJob.setEnabled(true);
+    MJob mJobSpy = org.mockito.Mockito.spy(testJob);
+    when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
+    when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
+    assertEquals(jobManager.getJob(123l), mJobSpy);
+    verify(repositoryManagerMock, times(1)).getRepository();
+    verify(jdbcRepoMock, times(1)).findJob(123l);
+  }
+
+  @Test
+  public void testDisabledJob() {
+    MJob testJob = job(123l, 456l);
+    testJob.setEnabled(false);
+    testJob.setPersistenceId(1111);
+    SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: "
+        + testJob.getPersistenceId());
+
+    MJob mJobSpy = org.mockito.Mockito.spy(testJob);
+    when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
+    when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
+    try {
+      jobManager.getJob(123l);
+    } catch (SqoopException ex) {
+      assertEquals(ex.getMessage(), exception.getMessage());
+      verify(repositoryManagerMock, times(1)).getRepository();
+      verify(jdbcRepoMock, times(1)).findJob(123l);
+    }
+  }
+
+  @Test
+  public void testUnknownJob() {
+    long testJobId = 555l;
+    SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: "
+        + testJobId);
+    when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
+    when(jdbcRepoMock.findJob(testJobId)).thenReturn(null);
+    try {
+      jobManager.getJob(testJobId);
+    } catch (SqoopException ex) {
+      assertEquals(ex.getMessage(), exception.getMessage());
+      verify(repositoryManagerMock, times(1)).getRepository();
+      verify(jdbcRepoMock, times(1)).findJob(testJobId);
+    }
+  }
+
+  private MJob job(long fromId, long toId) {
+    MJob job = new MJob(fromId, toId, 1L, 2L, null, null, null);
+    job.setName("Vampire");
+    job.setCreationUser("Buffy");
+    return job;
+  }
+
+  public List<Direction> getSupportedDirections() {
+    return Arrays.asList(new Direction[] { Direction.FROM, Direction.TO });
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestJobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestJobRequest.java b/core/src/test/java/org/apache/sqoop/framework/TestJobRequest.java
new file mode 100644
index 0000000..6ca1c6a
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/framework/TestJobRequest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.utils.ClassUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class TestJobRequest {
+
+  private JobRequest jobRequest;
+
+  @Before
+  public void initializeSubmissionRequest() {
+    jobRequest = new JobRequest();
+  }
+
+  @Test
+  public void testAddJar() {
+    jobRequest.addJar("A");
+    jobRequest.addJar("B");
+    jobRequest.addJar("A");
+
+    assertEquals(2, jobRequest.getJars().size());
+    assertEquals("A", jobRequest.getJars().get(0));
+    assertEquals("B", jobRequest.getJars().get(1));
+  }
+
+  @Test
+  public void testAddJarForClass() {
+    jobRequest.addJarForClass(TestJobRequest.class);
+    jobRequest.addJarForClass(TestFrameworkValidator.class);
+
+    assertEquals(1, jobRequest.getJars().size());
+    assertTrue(jobRequest.getJars().contains(ClassUtils.jarForClass(TestJobRequest.class)));
+  }
+
+  @Test
+  public void testAddJars() {
+    jobRequest.addJars(Arrays.asList("A", "B"));
+    jobRequest.addJars(Arrays.asList("B", "C"));
+
+    assertEquals(3, jobRequest.getJars().size());
+    assertEquals("A", jobRequest.getJars().get(0));
+    assertEquals("B", jobRequest.getJars().get(1));
+    assertEquals("C", jobRequest.getJars().get(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestSubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestSubmissionRequest.java b/core/src/test/java/org/apache/sqoop/framework/TestSubmissionRequest.java
deleted file mode 100644
index 3078ed2..0000000
--- a/core/src/test/java/org/apache/sqoop/framework/TestSubmissionRequest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.utils.ClassUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-public class TestSubmissionRequest {
-
-  private SubmissionRequest submissionRequest;
-
-  @Before
-  public void initializeSubmissionRequest() {
-    submissionRequest = new SubmissionRequest();
-  }
-
-  @Test
-  public void testAddJar() {
-    submissionRequest.addJar("A");
-    submissionRequest.addJar("B");
-    submissionRequest.addJar("A");
-
-    assertEquals(2, submissionRequest.getJars().size());
-    assertEquals("A", submissionRequest.getJars().get(0));
-    assertEquals("B", submissionRequest.getJars().get(1));
-  }
-
-  @Test
-  public void testAddJarForClass() {
-    submissionRequest.addJarForClass(TestSubmissionRequest.class);
-    submissionRequest.addJarForClass(TestFrameworkValidator.class);
-
-    assertEquals(1, submissionRequest.getJars().size());
-    assertTrue(submissionRequest.getJars().contains(ClassUtils.jarForClass(TestSubmissionRequest.class)));
-  }
-
-  @Test
-  public void testAddJars() {
-    submissionRequest.addJars(Arrays.asList("A", "B"));
-    submissionRequest.addJars(Arrays.asList("B", "C"));
-
-    assertEquals(3, submissionRequest.getJars().size());
-    assertEquals("A", submissionRequest.getJars().get(0));
-    assertEquals("B", submissionRequest.getJars().get(1));
-    assertEquals("C", submissionRequest.getJars().get(2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index 50daa62..0fcae75 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -52,39 +52,39 @@ import static org.mockito.Mockito.*;
 
 public class TestJdbcRepository {
 
-  private JdbcRepository repo;
-  private JdbcRepositoryTransaction tx;
-  private ConnectorManager connectorMgr;
-  private FrameworkManager frameworkMgr;
-  private JdbcRepositoryHandler repoHandler;
-  private Validator validator;
-  private MetadataUpgrader upgrader;
+  private JdbcRepository repoSpy;
+  private JdbcRepositoryTransaction repoTransactionMock;
+  private ConnectorManager connectorMgrMock;
+  private FrameworkManager frameworkMgrMock;
+  private JdbcRepositoryHandler repoHandlerMock;
+  private Validator validatorMock;
+  private MetadataUpgrader upgraderMock;
 
-  private Validation valid;
-  private Validation invalid;
+  private Validation validRepoMock;
+  private Validation invalidRepoMock;
 
   @Before
   public void setUp() throws Exception {
-    tx = mock(JdbcRepositoryTransaction.class);
-    connectorMgr = mock(ConnectorManager.class);
-    frameworkMgr = mock(FrameworkManager.class);
-    repoHandler = mock(JdbcRepositoryHandler.class);
-    validator = mock(Validator.class);
-    upgrader = mock(MetadataUpgrader.class);
-    repo = spy(new JdbcRepository(repoHandler, null));
+    repoTransactionMock = mock(JdbcRepositoryTransaction.class);
+    connectorMgrMock = mock(ConnectorManager.class);
+    frameworkMgrMock = mock(FrameworkManager.class);
+    repoHandlerMock = mock(JdbcRepositoryHandler.class);
+    validatorMock = mock(Validator.class);
+    upgraderMock = mock(MetadataUpgrader.class);
+    repoSpy = spy(new JdbcRepository(repoHandlerMock, null));
 
     // setup transaction and connector manager
-    doReturn(tx).when(repo).getTransaction();
-    ConnectorManager.setInstance(connectorMgr);
-    FrameworkManager.setInstance(frameworkMgr);
+    doReturn(repoTransactionMock).when(repoSpy).getTransaction();
+    ConnectorManager.setInstance(connectorMgrMock);
+    FrameworkManager.setInstance(frameworkMgrMock);
 
-    valid = mock(Validation.class);
-    when(valid.getStatus()).thenReturn(Status.ACCEPTABLE);
-    invalid = mock(Validation.class);
-    when(invalid.getStatus()).thenReturn(Status.UNACCEPTABLE);
+    validRepoMock = mock(Validation.class);
+    when(validRepoMock.getStatus()).thenReturn(Status.ACCEPTABLE);
+    invalidRepoMock = mock(Validation.class);
+    when(invalidRepoMock.getStatus()).thenReturn(Status.UNACCEPTABLE);
 
-    doNothing().when(upgrader).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
-    doNothing().when(upgrader).upgrade(any(MJobForms.class), any(MJobForms.class));
+    doNothing().when(upgraderMock).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
+    doNothing().when(upgraderMock).upgrade(any(MJobForms.class), any(MJobForms.class));
   }
 
   /**
@@ -95,20 +95,20 @@ public class TestJdbcRepository {
     MConnector newConnector = connector(1, "1.1");
     MConnector oldConnector = connector(1, "1.0");
 
-    when(repoHandler.findConnector(anyString(), any(Connection.class))).thenReturn(oldConnector);
+    when(repoHandlerMock.findConnector(anyString(), any(Connection.class))).thenReturn(oldConnector);
 
     // make the upgradeConnector to throw an exception to prove that it has been called
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "upgradeConnector() has been called.");
-    doThrow(exception).when(connectorMgr).getConnector(anyString());
+    doThrow(exception).when(connectorMgrMock).getConnector(anyString());
 
     try {
-      repo.registerConnector(newConnector, true);
+      repoSpy.registerConnector(newConnector, true);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnector(anyString(), any(Connection.class));
-      verify(connectorMgr, times(1)).getConnector(anyString());
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class));
+      verify(connectorMgrMock, times(1)).getConnector(anyString());
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -123,13 +123,13 @@ public class TestJdbcRepository {
     MConnector newConnector = connector(1, "1.1");
     MConnector oldConnector = connector(1);
 
-    when(repoHandler.findConnector(anyString(), any(Connection.class))).thenReturn(oldConnector);
+    when(repoHandlerMock.findConnector(anyString(), any(Connection.class))).thenReturn(oldConnector);
 
     try {
-      repo.registerConnector(newConnector, false);
+      repoSpy.registerConnector(newConnector, false);
     } catch (SqoopException ex) {
-      verify(repoHandler, times(1)).findConnector(anyString(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026);
       return ;
     }
@@ -145,20 +145,20 @@ public class TestJdbcRepository {
     MFramework newFramework = framework();
     MFramework oldFramework = anotherFramework();
 
-    when(repoHandler.findFramework(any(Connection.class))).thenReturn(oldFramework);
+    when(repoHandlerMock.findFramework(any(Connection.class))).thenReturn(oldFramework);
 
     // make the upgradeFramework to throw an exception to prove that it has been called
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "upgradeFramework() has been called.");
-    doThrow(exception).when(repoHandler).findConnections(any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findConnections(any(Connection.class));
 
     try {
-      repo.registerFramework(newFramework, true);
+      repoSpy.registerFramework(newFramework, true);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findFramework(any(Connection.class));
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findFramework(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -173,14 +173,14 @@ public class TestJdbcRepository {
     MFramework newFramework = framework();
     MFramework oldFramework = anotherFramework();
 
-    when(repoHandler.findFramework(any(Connection.class))).thenReturn(oldFramework);
+    when(repoHandlerMock.findFramework(any(Connection.class))).thenReturn(oldFramework);
 
     try {
-      repo.registerFramework(newFramework, false);
+      repoSpy.registerFramework(newFramework, false);
     } catch (SqoopException ex) {
       assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026);
-      verify(repoHandler, times(1)).findFramework(any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findFramework(any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -198,53 +198,53 @@ public class TestJdbcRepository {
 
     // prepare the sqoop connector
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
     when(sqconnector.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(JobConfiguration.class);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     // prepare the connections and jobs
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
     // mock necessary methods for upgradeConnector() procedure
-    doReturn(connectionList).when(repo).findConnectionsForConnector(anyLong());
-    doReturn(jobList).when(repo).findJobsForConnector(anyLong());
-    doNothing().when(repo).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
-
-    repo.upgradeConnector(oldConnector, newConnector);
-
-    InOrder repoOrder = inOrder(repo);
-    InOrder txOrder = inOrder(tx);
-    InOrder upgraderOrder = inOrder(upgrader);
-    InOrder validatorOrder = inOrder(validator);
-
-    repoOrder.verify(repo, times(1)).findConnectionsForConnector(anyLong());
-    repoOrder.verify(repo, times(1)).findJobsForConnector(anyLong());
-    repoOrder.verify(repo, times(1)).getTransaction();
-    repoOrder.verify(repo, times(1)).deleteJobInputs(1, tx);
-    repoOrder.verify(repo, times(1)).deleteJobInputs(2, tx);
-    repoOrder.verify(repo, times(1)).deleteConnectionInputs(1, tx);
-    repoOrder.verify(repo, times(1)).deleteConnectionInputs(2, tx);
-    repoOrder.verify(repo, times(1)).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
-    repoOrder.verify(repo, times(2)).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
-    repoOrder.verify(repo, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
+    doReturn(connectionList).when(repoSpy).findConnectionsForConnector(anyLong());
+    doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
+    doNothing().when(repoSpy).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
+
+    repoSpy.upgradeConnector(oldConnector, newConnector);
+
+    InOrder repoOrder = inOrder(repoSpy);
+    InOrder txOrder = inOrder(repoTransactionMock);
+    InOrder upgraderOrder = inOrder(upgraderMock);
+    InOrder validatorOrder = inOrder(validatorMock);
+
+    repoOrder.verify(repoSpy, times(1)).findConnectionsForConnector(anyLong());
+    repoOrder.verify(repoSpy, times(1)).findJobsForConnector(anyLong());
+    repoOrder.verify(repoSpy, times(1)).getTransaction();
+    repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(1, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(2, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(2)).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
-    txOrder.verify(tx, times(1)).begin();
-    txOrder.verify(tx, times(1)).commit();
-    txOrder.verify(tx, times(1)).close();
+    txOrder.verify(repoTransactionMock, times(1)).begin();
+    txOrder.verify(repoTransactionMock, times(1)).commit();
+    txOrder.verify(repoTransactionMock, times(1)).close();
     txOrder.verifyNoMoreInteractions();
-    upgraderOrder.verify(upgrader, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
-    upgraderOrder.verify(upgrader, times(4)).upgrade(any(MJobForms.class), any(MJobForms.class));
+    upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
+    upgraderOrder.verify(upgraderMock, times(4)).upgrade(any(MJobForms.class), any(MJobForms.class));
     upgraderOrder.verifyNoMoreInteractions();
-    validatorOrder.verify(validator, times(2)).validateConnection(anyObject());
+    validatorOrder.verify(validatorMock, times(2)).validateConnection(anyObject());
     // @TODO(Abe): Re-enable job validation?
-    validatorOrder.verify(validator, times(0)).validateJob(anyObject());
+    validatorOrder.verify(validatorMock, times(0)).validateJob(anyObject());
     validatorOrder.verifyNoMoreInteractions();
   }
 
@@ -321,49 +321,49 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeWithValidConnectionsAndJobs() {
     MFramework newFramework = framework();
 
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
-    when(frameworkMgr.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
-    when(frameworkMgr.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(frameworkMgrMock.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
+    when(frameworkMgrMock.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
-    doReturn(connectionList).when(repo).findConnections();
-    doReturn(jobList).when(repo).findJobs();
-    doNothing().when(repo).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
-
-    repo.upgradeFramework(newFramework);
-
-    InOrder repoOrder = inOrder(repo);
-    InOrder txOrder = inOrder(tx);
-    InOrder upgraderOrder = inOrder(upgrader);
-    InOrder validatorOrder = inOrder(validator);
-
-    repoOrder.verify(repo, times(1)).findConnections();
-    repoOrder.verify(repo, times(1)).findJobs();
-    repoOrder.verify(repo, times(1)).getTransaction();
-    repoOrder.verify(repo, times(1)).deleteJobInputs(1, tx);
-    repoOrder.verify(repo, times(1)).deleteJobInputs(2, tx);
-    repoOrder.verify(repo, times(1)).deleteConnectionInputs(1, tx);
-    repoOrder.verify(repo, times(1)).deleteConnectionInputs(2, tx);
-    repoOrder.verify(repo, times(1)).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
-    repoOrder.verify(repo, times(2)).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
-    repoOrder.verify(repo, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
+    doReturn(connectionList).when(repoSpy).findConnections();
+    doReturn(jobList).when(repoSpy).findJobs();
+    doNothing().when(repoSpy).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
+
+    repoSpy.upgradeFramework(newFramework);
+
+    InOrder repoOrder = inOrder(repoSpy);
+    InOrder txOrder = inOrder(repoTransactionMock);
+    InOrder upgraderOrder = inOrder(upgraderMock);
+    InOrder validatorOrder = inOrder(validatorMock);
+
+    repoOrder.verify(repoSpy, times(1)).findConnections();
+    repoOrder.verify(repoSpy, times(1)).findJobs();
+    repoOrder.verify(repoSpy, times(1)).getTransaction();
+    repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(1, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(2, repoTransactionMock);
+    repoOrder.verify(repoSpy, times(1)).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(2)).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
-    txOrder.verify(tx, times(1)).begin();
-    txOrder.verify(tx, times(1)).commit();
-    txOrder.verify(tx, times(1)).close();
+    txOrder.verify(repoTransactionMock, times(1)).begin();
+    txOrder.verify(repoTransactionMock, times(1)).commit();
+    txOrder.verify(repoTransactionMock, times(1)).close();
     txOrder.verifyNoMoreInteractions();
-    upgraderOrder.verify(upgrader, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
-    upgraderOrder.verify(upgrader, times(2)).upgrade(any(MJobForms.class), any(MJobForms.class));
+    upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
+    upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MJobForms.class), any(MJobForms.class));
     upgraderOrder.verifyNoMoreInteractions();
-    validatorOrder.verify(validator, times(2)).validateConnection(anyObject());
-    validatorOrder.verify(validator, times(2)).validateJob(anyObject());
+    validatorOrder.verify(validatorMock, times(2)).validateConnection(anyObject());
+    validatorOrder.verify(validatorMock, times(2)).validateJob(anyObject());
     validatorOrder.verifyNoMoreInteractions();
   }
 
@@ -375,50 +375,50 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeWithInvalidConnectionsAndJobs() {
     MFramework newFramework = framework();
 
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(invalid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(invalid);
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
-    when(frameworkMgr.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
-    when(frameworkMgr.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(invalidRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(invalidRepoMock);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(frameworkMgrMock.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
+    when(frameworkMgrMock.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
-    doReturn(connectionList).when(repo).findConnections();
-    doReturn(jobList).when(repo).findJobs();
-    doNothing().when(repo).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repo).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
+    doReturn(connectionList).when(repoSpy).findConnections();
+    doReturn(jobList).when(repoSpy).findJobs();
+    doNothing().when(repoSpy).updateConnection(any(MConnection.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0027);
 
-      InOrder repoOrder = inOrder(repo);
-      InOrder txOrder = inOrder(tx);
-      InOrder upgraderOrder = inOrder(upgrader);
-      InOrder validatorOrder = inOrder(validator);
-
-      repoOrder.verify(repo, times(1)).findConnections();
-      repoOrder.verify(repo, times(1)).findJobs();
-      repoOrder.verify(repo, times(1)).getTransaction();
-      repoOrder.verify(repo, times(1)).deleteJobInputs(1, tx);
-      repoOrder.verify(repo, times(1)).deleteJobInputs(2, tx);
-      repoOrder.verify(repo, times(1)).deleteConnectionInputs(1, tx);
-      repoOrder.verify(repo, times(1)).deleteConnectionInputs(2, tx);
-      repoOrder.verify(repo, times(1)).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
+      InOrder repoOrder = inOrder(repoSpy);
+      InOrder txOrder = inOrder(repoTransactionMock);
+      InOrder upgraderOrder = inOrder(upgraderMock);
+      InOrder validatorOrder = inOrder(validatorMock);
+
+      repoOrder.verify(repoSpy, times(1)).findConnections();
+      repoOrder.verify(repoSpy, times(1)).findJobs();
+      repoOrder.verify(repoSpy, times(1)).getTransaction();
+      repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
+      repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
+      repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(1, repoTransactionMock);
+      repoOrder.verify(repoSpy, times(1)).deleteConnectionInputs(2, repoTransactionMock);
+      repoOrder.verify(repoSpy, times(1)).updateFramework(any(MFramework.class), any(RepositoryTransaction.class));
       repoOrder.verifyNoMoreInteractions();
-      txOrder.verify(tx, times(1)).begin();
-      txOrder.verify(tx, times(1)).rollback();
-      txOrder.verify(tx, times(1)).close();
+      txOrder.verify(repoTransactionMock, times(1)).begin();
+      txOrder.verify(repoTransactionMock, times(1)).rollback();
+      txOrder.verify(repoTransactionMock, times(1)).close();
       txOrder.verifyNoMoreInteractions();
-      upgraderOrder.verify(upgrader, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
-      upgraderOrder.verify(upgrader, times(2)).upgrade(any(MJobForms.class), any(MJobForms.class));
+      upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConnectionForms.class), any(MConnectionForms.class));
+      upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MJobForms.class), any(MJobForms.class));
       upgraderOrder.verifyNoMoreInteractions();
-      validatorOrder.verify(validator, times(2)).validateConnection(anyObject());
-      validatorOrder.verify(validator, times(2)).validateJob(anyObject());
+      validatorOrder.verify(validatorMock, times(2)).validateConnection(anyObject());
+      validatorOrder.verify(validatorMock, times(2)).validateJob(anyObject());
       validatorOrder.verifyNoMoreInteractions();
       return ;
     }
@@ -436,20 +436,20 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find connections for connector error.");
-    doThrow(exception).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -466,24 +466,24 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find jobs for connector error.");
-    doThrow(exception).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -500,27 +500,27 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "delete job inputs for connector error.");
-    doThrow(exception).when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).deleteJobInputs(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).deleteJobInputs(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -537,29 +537,29 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "delete connection inputs for connector error.");
-    doThrow(exception).when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -576,31 +576,31 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connector error.");
-    doThrow(exception).when(repoHandler).updateConnector(any(MConnector.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -617,39 +617,39 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
     when(sqconnector.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(JobConfiguration.class);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).updateConnector(any(MConnector.class), any(Connection.class));
-    doReturn(true).when(repoHandler).existsConnection(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsConnection(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connection error.");
-    doThrow(exception).when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateConnection(any(MConnection.class), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
-      verify(repoHandler, times(1)).existsConnection(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateConnection(any(MConnection.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).existsConnection(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateConnection(any(MConnection.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -666,43 +666,43 @@ public class TestJdbcRepository {
     MConnector oldConnector = connector(1);
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(sqconnector.getValidator()).thenReturn(validator);
-    when(sqconnector.getMetadataUpgrader()).thenReturn(upgrader);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(sqconnector.getValidator()).thenReturn(validatorMock);
+    when(sqconnector.getMetadataUpgrader()).thenReturn(upgraderMock);
     when(sqconnector.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(JobConfiguration.class);
-    when(connectorMgr.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnectionsForConnector(anyLong(), any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobsForConnector(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).updateConnector(any(MConnector.class), any(Connection.class));
-    doNothing().when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));
-    doReturn(true).when(repoHandler).existsConnection(anyLong(), any(Connection.class));
-    doReturn(true).when(repoHandler).existsJob(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnectionsForConnector(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateConnection(any(MConnection.class), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsConnection(anyLong(), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update job error.");
-    doThrow(exception).when(repoHandler).updateJob(any(MJob.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
 
     try {
-      repo.upgradeConnector(oldConnector, newConnector);
+      repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
-      verify(repoHandler, times(2)).existsConnection(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).updateConnection(any(MConnection.class), any(Connection.class));
-      verify(repoHandler, times(1)).existsJob(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateJob(any(MJob.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnectionsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(2)).existsConnection(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).updateConnection(any(MConnection.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -717,19 +717,19 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerFindConnectionsError() {
     MFramework newFramework = framework();
 
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find connections error.");
-    doThrow(exception).when(repoHandler).findConnections(any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findConnections(any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -744,23 +744,23 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerFindJobsError() {
     MFramework newFramework = framework();
 
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find jobs error.");
-    doThrow(exception).when(repoHandler).findJobs(any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -775,26 +775,26 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerDeleteJobInputsError() {
     MFramework newFramework = framework();
 
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobs(any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "delete job inputs error.");
-    doThrow(exception).when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verify(repoHandler, times(1)).deleteJobInputs(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verify(repoHandlerMock, times(1)).deleteJobInputs(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -809,28 +809,28 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerDeleteConnectionInputsError() {
     MFramework newFramework = framework();
 
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobs(any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "delete connection inputs error.");
-    doThrow(exception).when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -845,30 +845,30 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerUpdateFrameworkError() {
     MFramework newFramework = framework();
 
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobs(any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update framework metadata error.");
-    doThrow(exception).when(repoHandler).updateFramework(any(MFramework.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateFramework(any(MFramework.class), any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -883,38 +883,38 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerUpdateConnectionError() {
     MFramework newFramework = framework();
 
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
-    when(frameworkMgr.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
-    when(frameworkMgr.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(frameworkMgrMock.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
+    when(frameworkMgrMock.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobs(any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).updateFramework(any(MFramework.class), any(Connection.class));
-    doReturn(true).when(repoHandler).existsConnection(anyLong(), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateFramework(any(MFramework.class), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsConnection(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connection error.");
-    doThrow(exception).when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateConnection(any(MConnection.class), any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
-      verify(repoHandler, times(1)).existsConnection(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateConnection(any(MConnection.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).existsConnection(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateConnection(any(MConnection.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 
@@ -929,42 +929,42 @@ public class TestJdbcRepository {
   public void testFrameworkUpgradeHandlerUpdateJobError() {
     MFramework newFramework = framework();
 
-    when(validator.validateConnection(any(MConnection.class))).thenReturn(valid);
-    when(validator.validateJob(any(MJob.class))).thenReturn(valid);
-    when(frameworkMgr.getValidator()).thenReturn(validator);
-    when(frameworkMgr.getMetadataUpgrader()).thenReturn(upgrader);
-    when(frameworkMgr.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
-    when(frameworkMgr.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
+    when(validatorMock.validateConnection(any(MConnection.class))).thenReturn(validRepoMock);
+    when(validatorMock.validateJob(any(MJob.class))).thenReturn(validRepoMock);
+    when(frameworkMgrMock.getValidator()).thenReturn(validatorMock);
+    when(frameworkMgrMock.getMetadataUpgrader()).thenReturn(upgraderMock);
+    when(frameworkMgrMock.getConnectionConfigurationClass()).thenReturn(EmptyConfigurationClass.class);
+    when(frameworkMgrMock.getJobConfigurationClass()).thenReturn(JobConfiguration.class);
 
     List<MConnection> connectionList = connections(connection(1,1), connection(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
-    doReturn(connectionList).when(repoHandler).findConnections(any(Connection.class));
-    doReturn(jobList).when(repoHandler).findJobs(any(Connection.class));
-    doNothing().when(repoHandler).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).deleteConnectionInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).updateFramework(any(MFramework.class), any(Connection.class));
-    doReturn(true).when(repoHandler).existsConnection(anyLong(), any(Connection.class));
-    doReturn(true).when(repoHandler).existsJob(anyLong(), any(Connection.class));
-    doNothing().when(repoHandler).updateConnection(any(MConnection.class), any(Connection.class));
+    doReturn(connectionList).when(repoHandlerMock).findConnections(any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).deleteConnectionInputs(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateFramework(any(MFramework.class), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsConnection(anyLong(), any(Connection.class));
+    doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
+    doNothing().when(repoHandlerMock).updateConnection(any(MConnection.class), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update job error.");
-    doThrow(exception).when(repoHandler).updateJob(any(MJob.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).updateJob(any(MJob.class), any(Connection.class));
 
     try {
-      repo.upgradeFramework(newFramework);
+      repoSpy.upgradeFramework(newFramework);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandler, times(1)).findConnections(any(Connection.class));
-      verify(repoHandler, times(1)).findJobs(any(Connection.class));
-      verify(repoHandler, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
-      verify(repoHandler, times(2)).existsConnection(anyLong(), any(Connection.class));
-      verify(repoHandler, times(2)).updateConnection(any(MConnection.class), any(Connection.class));
-      verify(repoHandler, times(1)).existsJob(anyLong(), any(Connection.class));
-      verify(repoHandler, times(1)).updateJob(any(MJob.class), any(Connection.class));
-      verifyNoMoreInteractions(repoHandler);
+      verify(repoHandlerMock, times(1)).findConnections(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).deleteConnectionInputs(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateFramework(any(MFramework.class), any(Connection.class));
+      verify(repoHandlerMock, times(2)).existsConnection(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(2)).updateConnection(any(MConnection.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
+      verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRJobRequest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRJobRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRJobRequest.java
new file mode 100644
index 0000000..2d53dd2
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRJobRequest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.framework.JobRequest;
+
+/**
+ * Map-reduce specific submission request containing all extra information
+ * needed for bootstrapping map-reduce job.
+ */
+public class MRJobRequest extends JobRequest {
+
+  /**
+   * Map-reduce specific options.
+   */
+  Class<? extends InputFormat> inputFormatClass;
+  Class<? extends Mapper> mapperClass;
+  Class<? extends Writable> mapOutputKeyClass;
+  Class<? extends Writable> mapOutputValueClass;
+  Class<? extends OutputFormat> outputFormatClass;
+  Class<? extends Writable> outputKeyClass;
+  Class<? extends Writable> outputValueClass;
+
+  public MRJobRequest() {
+    super();
+  }
+
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public void setMapperClass(Class<? extends Mapper> mapperClass) {
+    this.mapperClass = mapperClass;
+  }
+
+  public Class<? extends Writable> getMapOutputKeyClass() {
+    return mapOutputKeyClass;
+  }
+
+  public void setMapOutputKeyClass(Class<? extends Writable> mapOutputKeyClass) {
+    this.mapOutputKeyClass = mapOutputKeyClass;
+  }
+
+  public Class<? extends Writable> getMapOutputValueClass() {
+    return mapOutputValueClass;
+  }
+
+  public void setMapOutputValueClass(Class<? extends Writable> mapOutputValueClass) {
+    this.mapOutputValueClass = mapOutputValueClass;
+  }
+
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return outputFormatClass;
+  }
+
+  public void setOutputFormatClass(Class<? extends OutputFormat> outputFormatClass) {
+    this.outputFormatClass = outputFormatClass;
+  }
+
+  public Class<? extends Writable> getOutputKeyClass() {
+    return outputKeyClass;
+  }
+
+  public void setOutputKeyClass(Class<? extends Writable> outputKeyClass) {
+    this.outputKeyClass = outputKeyClass;
+  }
+
+  public Class<? extends Writable> getOutputValueClass() {
+    return outputValueClass;
+  }
+
+  public void setOutputValueClass(Class<? extends Writable> outputValueClass) {
+    this.outputValueClass = outputValueClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
deleted file mode 100644
index 32d598c..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.execution.mapreduce;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.framework.SubmissionRequest;
-
-/**
- * Map-reduce specific submission request containing all extra information
- * needed for bootstrapping map-reduce job.
- */
-public class MRSubmissionRequest extends SubmissionRequest {
-
-  /**
-   * Map-reduce specific options.
-   */
-  Class<? extends InputFormat> inputFormatClass;
-  Class<? extends Mapper> mapperClass;
-  Class<? extends Writable> mapOutputKeyClass;
-  Class<? extends Writable> mapOutputValueClass;
-  Class<? extends OutputFormat> outputFormatClass;
-  Class<? extends Writable> outputKeyClass;
-  Class<? extends Writable> outputValueClass;
-
-  public MRSubmissionRequest() {
-    super();
-  }
-
-  public Class<? extends InputFormat> getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
-    this.inputFormatClass = inputFormatClass;
-  }
-
-  public Class<? extends Mapper> getMapperClass() {
-    return mapperClass;
-  }
-
-  public void setMapperClass(Class<? extends Mapper> mapperClass) {
-    this.mapperClass = mapperClass;
-  }
-
-  public Class<? extends Writable> getMapOutputKeyClass() {
-    return mapOutputKeyClass;
-  }
-
-  public void setMapOutputKeyClass(Class<? extends Writable> mapOutputKeyClass) {
-    this.mapOutputKeyClass = mapOutputKeyClass;
-  }
-
-  public Class<? extends Writable> getMapOutputValueClass() {
-    return mapOutputValueClass;
-  }
-
-  public void setMapOutputValueClass(Class<? extends Writable> mapOutputValueClass) {
-    this.mapOutputValueClass = mapOutputValueClass;
-  }
-
-  public Class<? extends OutputFormat> getOutputFormatClass() {
-    return outputFormatClass;
-  }
-
-  public void setOutputFormatClass(Class<? extends OutputFormat> outputFormatClass) {
-    this.outputFormatClass = outputFormatClass;
-  }
-
-  public Class<? extends Writable> getOutputKeyClass() {
-    return outputKeyClass;
-  }
-
-  public void setOutputKeyClass(Class<? extends Writable> outputKeyClass) {
-    this.outputKeyClass = outputKeyClass;
-  }
-
-  public Class<? extends Writable> getOutputValueClass() {
-    return outputValueClass;
-  }
-
-  public void setOutputValueClass(Class<? extends Writable> outputValueClass) {
-    this.outputValueClass = outputValueClass;
-  }
-}