You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/23 11:21:54 UTC

sqoop git commit: SQOOP-2634: Sqoop2: Provide classpath isolation for connectors and its dependencies

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 f4909a563 -> c60d44f94


SQOOP-2634: Sqoop2: Provide classpath isolation for connectors and its dependencies

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c60d44f9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c60d44f9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c60d44f9

Branch: refs/heads/sqoop2
Commit: c60d44f940e1a6766b4d35ef3642cb97846333f5
Parents: f4909a5
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Dec 23 11:21:15 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Dec 23 11:21:15 2015 +0100

----------------------------------------------------------------------
 assemblies/pom.xml                              | 34 +++++++
 .../resources/assemblies/sqoop-connector.xml    | 44 +++++++++
 .../sqoop/classloader/ConnectorClassLoader.java | 96 ++++++++++----------
 .../apache/sqoop/error/code/ConnectorError.java |  7 +-
 .../sqoop/error/code/MRExecutionError.java      |  3 +
 .../java/org/apache/sqoop/utils/ClassUtils.java | 20 ++++
 .../classloader/TestConnectorClassLoader.java   |  6 +-
 .../org/apache/sqoop/utils/TestClassUtils.java  |  1 +
 connector/connector-ftp/pom.xml                 | 17 +++-
 connector/connector-generic-jdbc/pom.xml        |  9 ++
 connector/connector-hdfs/pom.xml                |  9 ++
 connector/connector-kafka/pom.xml               | 12 ++-
 .../connector/kafka/KafkaToInitializer.java     | 17 ----
 connector/connector-kite/pom.xml                | 10 ++
 .../connector/kite/KiteFromInitializer.java     | 15 +--
 .../sqoop/connector/kite/KiteToInitializer.java | 20 +---
 connector/connector-oracle-jdbc/pom.xml         |  6 ++
 .../sqoop/connector/spi/SqoopConnector.java     | 10 ++
 connector/connector-sftp/pom.xml                |  9 ++
 .../sqoop/connector/sftp/SftpToInitializer.java | 18 ----
 .../sqoop/connector/ConnectorHandler.java       |  7 +-
 .../sqoop/connector/ConnectorManager.java       |  6 +-
 .../sqoop/connector/ConnectorManagerUtils.java  | 28 +++++-
 .../org/apache/sqoop/driver/JobManager.java     | 16 +++-
 .../connector/TestConnectorManagerUtils.java    |  6 ++
 .../mapreduce/MapreduceExecutionEngine.java     |  5 +
 .../org/apache/sqoop/job/MRJobConstants.java    |  6 ++
 .../java/org/apache/sqoop/job/mr/MRUtils.java   | 95 +++++++++++++++++++
 .../apache/sqoop/job/mr/SqoopInputFormat.java   | 27 +++++-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    | 21 ++++-
 .../sqoop/job/mr/SqoopNullOutputFormat.java     | 29 +++++-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   | 57 +++++++-----
 .../org/apache/sqoop/job/TestMapReduce.java     |  3 +
 .../java/org/apache/sqoop/job/TestMatching.java |  1 +
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |  6 +-
 pom.xml                                         | 31 +++++++
 server/pom.xml                                  | 42 +++++++++
 .../mapreduce/MapreduceSubmissionEngine.java    |  1 -
 38 files changed, 582 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/assemblies/pom.xml
----------------------------------------------------------------------
diff --git a/assemblies/pom.xml b/assemblies/pom.xml
new file mode 100644
index 0000000..ebc7941
--- /dev/null
+++ b/assemblies/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>sqoop</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop</groupId>
+  <artifactId>sqoop-assemblies</artifactId>
+  <name>Sqoop Assemblies</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/assemblies/src/main/resources/assemblies/sqoop-connector.xml
----------------------------------------------------------------------
diff --git a/assemblies/src/main/resources/assemblies/sqoop-connector.xml b/assemblies/src/main/resources/assemblies/sqoop-connector.xml
new file mode 100644
index 0000000..3c3c552
--- /dev/null
+++ b/assemblies/src/main/resources/assemblies/sqoop-connector.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+  <id>sqoop-connector</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>lib</outputDirectory>
+      <unpack>false</unpack>
+      <useProjectArtifact>false</useProjectArtifact>
+      <useStrictFiltering>true</useStrictFiltering>
+      <includes>
+        <include>*</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/classes/</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>**/*</include>
+      </includes>
+    </fileSet>
+   </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java b/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
index 370de2a..fa5ec7d 100644
--- a/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
+++ b/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
@@ -125,11 +125,11 @@ public class ConnectorClassLoader extends URLClassLoader {
   private final ClassLoader parent;
   private final List<String> systemClasses;
 
-  public ConnectorClassLoader(URL[] urls, ClassLoader parent,
+  public ConnectorClassLoader(URL url, ClassLoader parent,
       List<String> systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
-    super(urls, parent);
+    super(new URL[] {url}, parent);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("urls: " + Arrays.toString(urls));
+      LOG.debug("url: " + url);
       LOG.debug("system classes: " + systemClasses);
     }
     this.parent = parent;
@@ -147,17 +147,17 @@ public class ConnectorClassLoader extends URLClassLoader {
     LOG.info("system classes: " + this.systemClasses);
 
     urlFactory = new ConnectorURLFactory(this);
-    load(urls);
+    load(url);
   }
 
-  public ConnectorClassLoader(String classpath, ClassLoader parent,
+  public ConnectorClassLoader(String connectorJar, ClassLoader parent,
       List<String> systemClasses) throws IOException {
-    this(constructUrlsFromClasspath(classpath), parent, systemClasses, true);
+    this(connectorJar, parent, systemClasses, true);
   }
 
-  public ConnectorClassLoader(String classpath, ClassLoader parent,
+  public ConnectorClassLoader(String connectorJar, ClassLoader parent,
       List<String> systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
-    this(constructUrlsFromClasspath(classpath), parent, systemClasses, overrideDefaultSystemClasses);
+    this(new File(connectorJar).toURI().toURL(), parent, systemClasses, overrideDefaultSystemClasses);
   }
 
   static URL[] constructUrlsFromClasspath(String classpath)
@@ -485,50 +485,48 @@ public class ConnectorClassLoader extends URLClassLoader {
     return resource;
   }
 
-  private void load(URL[] urls) throws IOException {
-    for (URL url : urls) {
-      String jarName = url.getPath();
-      JarFile jarFile = null;
-      try {
-        jarFile = new JarFile(jarName);
-        Manifest manifest = jarFile.getManifest();
-
-        Enumeration<JarEntry> entryEnum = jarFile.entries();
-        while (entryEnum.hasMoreElements()) {
-          JarEntry entry = entryEnum.nextElement();
-          if (entry.isDirectory()) {
-            continue;
-          }
+  private void load(URL connectorUrl) throws IOException {
+    String jarName = connectorUrl.getPath();
+    JarFile jarFile = null;
+    try {
+      jarFile = new JarFile(jarName);
+      Manifest manifest = jarFile.getManifest();
+
+      Enumeration<JarEntry> entryEnum = jarFile.entries();
+      while (entryEnum.hasMoreElements()) {
+        JarEntry entry = entryEnum.nextElement();
+        if (entry.isDirectory()) {
+          continue;
+        }
 
-          String entryName = entry.getName();
-          InputStream is = jarFile.getInputStream(entry);
-          if (is == null) {
-            throw new IOException("Unable to load resource " + entryName);
-          }
-          try {
-            if (entryName.startsWith(LIB_PREFIX)) {
-              LOG.debug("Caching " + entryName);
-              loadBytesFromJar(is, entryName);
-            } else if (entryName.endsWith(CLASS)) {
-              // A plain vanilla class file rooted at the top of the jar file.
-              loadBytes(entry, is, "/", manifest);
-              LOG.debug("Loaded class: " + jarFile.getName() + "!/" + entry.getName());
-            } else {
-              // A resource
-              loadBytes(entry, is, "/", manifest);
-              LOG.debug("Loaded resource: " + jarFile.getName() + "!/" + entry.getName());
-            }
-          } finally {
-            is.close();
-          }
+        String entryName = entry.getName();
+        InputStream is = jarFile.getInputStream(entry);
+        if (is == null) {
+          throw new IOException("Unable to load resource " + entryName);
         }
-      } finally {
-        if (jarFile != null) {
-          try {
-            jarFile.close();
-          } catch (IOException e) {
-            LOG.debug("Exception closing jarFile: " + jarName, e);
+        try {
+          if (entryName.startsWith(LIB_PREFIX)) {
+            LOG.debug("Caching " + entryName);
+            loadBytesFromJar(is, entryName);
+          } else if (entryName.endsWith(CLASS)) {
+            // A plain vanilla class file rooted at the top of the jar file.
+            loadBytes(entry, is, "/", manifest);
+            LOG.debug("Loaded class: " + jarFile.getName() + "!/" + entry.getName());
+          } else {
+            // A resource
+            loadBytes(entry, is, "/", manifest);
+            LOG.debug("Loaded resource: " + jarFile.getName() + "!/" + entry.getName());
           }
+        } finally {
+          is.close();
+        }
+      }
+    } finally {
+      if (jarFile != null) {
+        try {
+          jarFile.close();
+        } catch (IOException e) {
+          LOG.debug("Exception closing jarFile: " + jarName, e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
index 2f17d95..cc98368 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
@@ -59,7 +59,12 @@ public enum ConnectorError implements ErrorCode {
       + "changed since it was registered previously."),
 
   /** A connector is not assigned with a valid id yet. */
-  CONN_0010("A connector is not assigned with a valid id yet");
+  CONN_0010("A connector is not assigned with a valid id yet"),
+
+  /** Failed to create ConnectorClassLoader. */
+  CONN_0011("Failed to create ConnectorClassLoader"),
+
+  ;
 
   private final String message;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java b/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
index 21e5c82..0ccc84a 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
@@ -79,6 +79,9 @@ public enum MRExecutionError implements ErrorCode {
   /** Got invalid number of partitions from Partitioner */
   MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
 
+  /** Unable to find connector jar */
+  MAPRED_EXEC_0026("Unable to find connector jar"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java b/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
index ed68988..d26afdd 100644
--- a/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
+++ b/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -271,6 +272,25 @@ public final class ClassUtils {
     return classLoader;
   }
 
+  public static Object executeWithClassLoader(ClassLoader loader, Callable<?> callable) {
+    ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+    if (loader != null) {
+      Thread.currentThread().setContextClassLoader(loader);
+    }
+    try {
+      return callable.call();
+    } catch (Exception e) {
+      if (e instanceof SqoopException) {
+        throw (SqoopException) e;
+      } else {
+        throw new SqoopException(CoreError.CORE_0000, e);
+      }
+    } finally {
+      // Restore the old context ClassLoader
+      Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+    }
+  }
+
   private ClassUtils() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java b/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
index 87edf3d..be12d56 100644
--- a/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
+++ b/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
@@ -143,7 +143,7 @@ public class TestConnectorClassLoader {
 
     ClassLoader currentClassLoader = getClass().getClassLoader();
     ClassLoader connectorClassloader = new ConnectorClassLoader(
-        new URL[] { testJar }, currentClassLoader, null, false);
+        testJar, currentClassLoader, null, false);
 
     assertNull(currentClassLoader.getResourceAsStream("resource.txt"),
         "Resource should be null for current classloader");
@@ -164,7 +164,7 @@ public class TestConnectorClassLoader {
 
     ClassLoader currentClassLoader = getClass().getClassLoader();
     ClassLoader connectorClassloader = new ConnectorClassLoader(
-        new URL[] { testJar }, currentClassLoader, null, false);
+        testJar, currentClassLoader, null, false);
 
     List<String> resourceContents = new ArrayList<String>();
     resourceContents.add("hello A");
@@ -185,7 +185,7 @@ public class TestConnectorClassLoader {
 
     ClassLoader currentClassLoader = getClass().getClassLoader();
     ClassLoader connectorClassloader = new ConnectorClassLoader(
-        new URL[] { testJar }, currentClassLoader, null, false);
+        testJar, currentClassLoader, null, false);
 
     try {
       currentClassLoader.loadClass("A");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java b/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
index ec48f82..fc24d9a 100644
--- a/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
+++ b/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
@@ -171,6 +171,7 @@ public class TestClassUtils {
   @Test
   public void testLoadClassWithClassLoader() throws Exception {
     String classpath = ClassUtils.jarForClass(testAClass);
+    classpath = classpath.startsWith("file:") ? classpath.substring("file:".length()) : classpath;
     assertNotEquals(testAClass, ClassUtils.loadClassWithClassLoader(testAClass.getName(),
         new ConnectorClassLoader(classpath, getClass().getClassLoader(), Arrays.asList("java."))));
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-ftp/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/pom.xml b/connector/connector-ftp/pom.xml
index 41ea026..a330266 100644
--- a/connector/connector-ftp/pom.xml
+++ b/connector/connector-ftp/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -66,10 +67,18 @@ limitations under the License.
     </dependency>
 
     <dependency>
-        <groupId>org.slf4j</groupId>
-        <artifactId>slf4j-log4j12</artifactId>
-        <version>${slf4j.version}</version>
-      </dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-generic-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/pom.xml b/connector/connector-generic-jdbc/pom.xml
index 052f06d..8d054c1 100644
--- a/connector/connector-generic-jdbc/pom.xml
+++ b/connector/connector-generic-jdbc/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -60,4 +61,12 @@ limitations under the License.
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 022a024..d695750 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -68,4 +69,12 @@ limitations under the License.
 
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-kafka/pom.xml b/connector/connector-kafka/pom.xml
index e0f0684..5f41181 100644
--- a/connector/connector-kafka/pom.xml
+++ b/connector/connector-kafka/pom.xml
@@ -33,6 +33,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
@@ -41,6 +42,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-common-test</artifactId>
+      <scope>test</scope>
     </dependency>
 
     <dependency>
@@ -50,4 +52,12 @@ limitations under the License.
     </dependency>
   </dependencies>
 
-</project>
\ No newline at end of file
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
index 923d1aa..7b1f568 100644
--- a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
+++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
@@ -22,9 +22,6 @@ import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.Set;
 
 public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfiguration> {
 
@@ -34,18 +31,4 @@ public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfi
   public void initialize(InitializerContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) {
     LOG.info("Running Kafka Connector initializer. This does nothing except log this message.");
   }
-
-
-  @Override
-  public Set<String> getJars(InitializerContext context, LinkConfiguration
-          linkConfiguration, ToJobConfiguration toJobConfiguration) {
-    Set<String> jars = super.getJars(context, linkConfiguration, toJobConfiguration);
-    // Jars for Kafka, Scala and Yammer (required by Kafka)
-    jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer"));
-    jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike"));
-    jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics"));
-    return jars;
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-kite/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml
index d8eaa8e..0792445 100644
--- a/connector/connector-kite/pom.xml
+++ b/connector/connector-kite/pom.xml
@@ -36,10 +36,12 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -97,4 +99,12 @@ limitations under the License.
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
index 28c5bac..4502d59 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
@@ -56,24 +56,11 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration,
   public Set<String> getJars(InitializerContext context,
       LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
     Set<String> jars = super.getJars(context, linkConfig, fromJobConfig);
-    jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets"));
-    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
-    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
-    jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
-    jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
-    jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
-    jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
-    jars.add(ClassUtils.jarForClass("parquet.Version"));
-    jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
-    jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
-    jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
-    jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
+
     if (fromJobConfig.fromJobConfig.uri.startsWith("dataset:hive")) {
       // @TODO(Abe): Remove a deps that aren't used?
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
-      jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
-      jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
       jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
       jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index 50daba0..effab19 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.connector.kite;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -59,28 +58,11 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
   public Set<String> getJars(InitializerContext context,
       LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
     Set<String> jars = super.getJars(context, linkConfig, toJobConfig);
-    jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
-    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
-    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
-    if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
-      jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
-    }
-    if (FileFormat.PARQUET.equals(toJobConfig.toJobConfig.fileFormat)) {
-      jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
-      jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
-      jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
-      jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
-      jars.add(ClassUtils.jarForClass("parquet.Version"));
-      jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
-      jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
-      jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
-    }
+
     if (toJobConfig.toJobConfig.uri.startsWith("dataset:hive")) {
       // @TODO(Abe): Remove a deps that aren't used?
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
-      jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
-      jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
       jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
       jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
       jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-oracle-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/pom.xml b/connector/connector-oracle-jdbc/pom.xml
index 8186b3a..4262cb2 100644
--- a/connector/connector-oracle-jdbc/pom.xml
+++ b/connector/connector-oracle-jdbc/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <!-- Test dependencies -->
@@ -125,6 +126,11 @@ limitations under the License.
               <excludedGroups>none</excludedGroups>
             </configuration>
           </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-assembly-plugin</artifactId>
+          </plugin>
         </plugins>
       </build>
     </profile>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 85ba8be..6733906 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -37,6 +37,16 @@ import org.apache.sqoop.job.etl.To;
 @InterfaceStability.Evolving
 public abstract class SqoopConnector {
 
+  private String connectorName;
+
+  public void setConnectorName(String connectorName) {
+    this.connectorName = connectorName;
+  }
+
+  public String getConnectorName() {
+    return this.connectorName;
+  }
+
   /**
    * Retrieve connector version.
    *

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-sftp/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sftp/pom.xml b/connector/connector-sftp/pom.xml
index 312ac61..8db1af5 100644
--- a/connector/connector-sftp/pom.xml
+++ b/connector/connector-sftp/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>connector-sdk</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -49,4 +50,12 @@ limitations under the License.
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
index bfb51ac..7f2fde1 100644
--- a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
+++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
@@ -23,9 +23,6 @@ import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.Set;
 
 /**
  * Perform any required initialization before execution of job.
@@ -43,19 +40,4 @@ public class SftpToInitializer extends Initializer<LinkConfiguration, ToJobConfi
     LOG.info("Running SFTP Connector TO initializer.");
     // do nothing at this point
   }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public Set<String> getJars(InitializerContext context,
-                             LinkConfiguration linkConfiguration,
-                             ToJobConfiguration toJobConfiguration) {
-    Set<String> jars =
-      super.getJars(context, linkConfiguration, toJobConfiguration);
-    // Jar for jsch library:
-    jars.add(ClassUtils.jarForClass("com.jcraft.jsch.JSch"));
-    return jars;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index 1899bb7..367bb80 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -71,13 +71,18 @@ public final class ConnectorHandler {
       throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
     }
 
-    Class<?> connectorClass = ClassUtils.loadClass(connectorClassName);
+    String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
+    ClassLoader connectorClassLoader = ConnectorManagerUtils
+        .createConnectorClassLoader(connectorJarPath, null);
+    Class<?> connectorClass = ClassUtils.loadClassWithClassLoader(
+        connectorClassName, connectorClassLoader);
     if(connectorClass == null) {
       throw new SqoopException(ConnectorError.CONN_0005, connectorClassName);
     }
 
     try {
       connector = (SqoopConnector) connectorClass.newInstance();
+      connector.setConnectorName(connectorUniqueName);
     } catch (IllegalAccessException ex) {
       throw new SqoopException(ConnectorError.CONN_0005,
               connectorClassName, ex);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index f19f391..5663445 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -159,7 +159,11 @@ public class ConnectorManager implements Reconfigurable {
   }
 
   public SqoopConnector getSqoopConnector(String uniqueName) {
-    return handlerMap.get(uniqueName).getSqoopConnector();
+    if (handlerMap != null && handlerMap.get(uniqueName) != null) {
+      return handlerMap.get(uniqueName).getSqoopConnector();
+    } else {
+      return null;
+    }
   }
 
   public synchronized void initialize() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
index 9f9be57..064da35 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
@@ -17,14 +17,17 @@
  */
 package org.apache.sqoop.connector;
 
+import org.apache.sqoop.classloader.ConnectorClassLoader;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.error.code.ConnectorError;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -96,4 +99,27 @@ public class ConnectorManagerUtils {
       throw new RuntimeException(e);
     }
   }
+
+  public static String getConnectorJarPath(URL configFileUrl) {
+    return configFileUrl.getPath().substring("file:".length(),
+        configFileUrl.getPath().length() -
+        ("!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES).length());
+  }
+
+  public static ClassLoader createConnectorClassLoader(
+      final String connectorJarPath, final List<String> systemClasses) {
+    try {
+      return AccessController.doPrivileged(
+        new PrivilegedExceptionAction<ClassLoader>() {
+          @Override
+          public ClassLoader run() throws IOException {
+            return new ConnectorClassLoader(connectorJarPath,
+                Thread.currentThread().getContextClassLoader(),
+                systemClasses, false);
+          }
+        });
+    } catch (PrivilegedActionException e) {
+      throw new SqoopException(ConnectorError.CONN_0011, e);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index d3a750e..39a0260 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
@@ -509,10 +510,19 @@ public class JobManager implements Reconfigurable {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void initializeConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  private void initializeConnector(final JobRequest jobRequest, final Direction direction,
+      final Initializer initializer, final InitializerContext initializerContext) {
     // Initialize submission from the connector perspective
-    initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
-        jobRequest.getJobConfig(direction));
+    ClassUtils.executeWithClassLoader(initializer.getClass().getClassLoader(),
+        new Callable<Void>() {
+      @Override
+      public Void call() {
+        initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
+            jobRequest.getJobConfig(direction));
+        return null;
+      }
+    });
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java b/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
index 423b3df..89ca60f 100644
--- a/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
+++ b/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sqoop.connector;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -71,4 +72,9 @@ public class TestConnectorManagerUtils {
     assertFalse(ConnectorManagerUtils.isBlacklisted(url, blacklistedConnectors));
   }
 
+  @Test
+  public void testGetConnectorJarPath() throws Exception {
+    URL url = new URL("jar:file:" +  testConnectorPath + "!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
+    assertEquals(ConnectorManagerUtils.getConnectorJarPath(url), testConnectorPath);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 3acd4a1..d6f0ff7 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -81,6 +81,11 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     if(mrJobRequest.getExtractors() != null) {
       context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
     }
+
+    context.setString(MRJobConstants.JOB_CONNECTOR_FROM_NAME,
+        mrJobRequest.getConnector(Direction.FROM).getConnectorName());
+    context.setString(MRJobConstants.JOB_CONNECTOR_TO_NAME,
+        mrJobRequest.getConnector(Direction.TO).getConnectorName());
   }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
index 737ceda..8968198 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -66,6 +66,12 @@ public final class MRJobConstants extends Constants {
   public static final String PREFIX_CONNECTOR_TO_CONTEXT =
       PREFIX_JOB_CONFIG + "connector.to.context.";
 
+  public static final String JOB_CONNECTOR_FROM_NAME = PREFIX_JOB_CONFIG
+      + "connector.from.name";
+
+   public static final String JOB_CONNECTOR_TO_NAME = PREFIX_JOB_CONFIG
+      + "connector.to.name";
+
   // Hadoop specific constants
   // We're using constants from Hadoop 1. Hadoop 2 has different names, but
   // provides backward compatibility layer for those names as well.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java
new file mode 100644
index 0000000..3e55778
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManagerUtils;
+import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.error.code.MRExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+
+public final class MRUtils {
+  private static ClassLoader fromConnectorClassLoader;
+  private static ClassLoader toConnectorClassLoader;
+
+  private static boolean connectorClassLoaderInited = false;
+
+  public static synchronized void initConnectorClassLoaders(Configuration conf) {
+    if (connectorClassLoaderInited) {
+      return;
+    }
+
+    // Create ConnectorClassLoader for from connector
+    String partitionClass = conf.get(MRJobConstants.JOB_ETL_PARTITION);
+    String fromConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_FROM_NAME);
+    if (!StringUtils.isBlank(fromConnectorName)) {
+      fromConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
+          getConnectorJarName(fromConnectorName), Arrays.asList(partitionClass));
+    }
+
+    // Create ConnectorClassLoader for to connector
+    String toConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_TO_NAME);
+    if (!StringUtils.isBlank(toConnectorName)) {
+      toConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
+          getConnectorJarName(toConnectorName), null);
+    }
+
+    connectorClassLoaderInited = true;
+  }
+
+  public static ClassLoader getConnectorClassLoader(Direction direction) {
+    switch (direction) {
+      case FROM:
+        return fromConnectorClassLoader;
+      case TO:
+        return toConnectorClassLoader;
+    }
+
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static String getConnectorJarName(String connectorName) {
+    List<URL> configFileUrls = ConnectorManagerUtils.getConnectorConfigs(Collections.EMPTY_SET);
+    try {
+      for (URL configFileUrl : configFileUrls) {
+        Properties properties = new Properties();
+        properties.load(configFileUrl.openStream());
+        if (connectorName.equals(properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME))) {
+          String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
+          return connectorJarPath.substring(connectorJarPath.lastIndexOf("/") + 1);
+        }
+      }
+    } catch (IOException e) {
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName, e);
+    }
+
+    throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 0623f7b..c471ae9 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.job.mr;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -32,6 +33,8 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.error.code.MRExecutionError;
 import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.Partition;
@@ -54,9 +57,29 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
     return new SqoopRecordReader();
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings("unchecked")
   @Override
-  public List<InputSplit> getSplits(JobContext context)
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public List<InputSplit> getSplits(final JobContext context)
+      throws IOException, InterruptedException {
+    SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
+        context.getConfiguration().get(MRJobConstants.JOB_CONNECTOR_FROM_NAME));
+    ClassLoader loader = null;
+    if (connector != null) {
+      loader = connector.getClass().getClassLoader();
+    }
+
+    return (List<InputSplit>) ClassUtils.executeWithClassLoader(loader,
+        new Callable<List<InputSplit>>() {
+      @Override
+      public List<InputSplit> call() throws IOException, InterruptedException {
+        return getSplitsInternal(context);
+      }
+    });
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private List<InputSplit> getSplitsInternal(JobContext context)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7d20992..6db645e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -18,13 +18,13 @@
 package org.apache.sqoop.job.mr;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MRConstants;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
@@ -61,9 +61,24 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
   private IntermediateDataFormat<Object> toIDF = null;
   private Matcher matcher;
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
-  public void run(Context context) throws IOException, InterruptedException {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void run(final Context context) throws IOException, InterruptedException {
+    // Set context ClassLoader for this thread to the ClassLoader for from connector
+    MRUtils.initConnectorClassLoaders(context.getConfiguration());
+
+    ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
+        new Callable<Void>() {
+      @Override
+      public Void call() throws IOException, InterruptedException {
+        runInternal(context);
+        return null;
+      }
+    });
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void runInternal(Context context) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
 
     String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 8c8526b..9fb8155 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.sqoop.job.mr;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -32,6 +33,7 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.utils.ClassUtils;
 
 /**
  * An output format for MapReduce job.
@@ -58,7 +60,8 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
 
   private static class SqoopDestroyerOutputCommitter extends OutputCommitter {
     @Override
-    public void setupJob(JobContext jobContext) {
+    public void setupJob(JobContext jobContext) throws IOException {
+      MRUtils.initConnectorClassLoaders(jobContext.getConfiguration());
     }
 
     @Override
@@ -73,10 +76,26 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita
       invokeDestroyerExecutor(jobContext, false);
     }
 
-    private void invokeDestroyerExecutor(JobContext jobContext, boolean success) {
-      Configuration config = jobContext.getConfiguration();
-      SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
-      SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+    private void invokeDestroyerExecutor(final JobContext jobContext, final boolean success) {
+      final Configuration config = jobContext.getConfiguration();
+
+      ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
+          new Callable<Void>() {
+        @Override
+        public Void call() throws IOException, InterruptedException {
+          SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+          return null;
+        }
+      });
+      ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.TO),
+          new Callable<Void>() {
+        @Override
+        public Void call() throws IOException, InterruptedException {
+          SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+          return null;
+        }
+      });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 623d1f4..b283982 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -243,32 +243,43 @@ public class SqoopOutputFormatLoadExecutor {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
     public void run() {
       LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
       try {
-        DataReader reader = new SqoopOutputFormatDataReader();
-        Configuration conf = context.getConfiguration();
-        Loader loader = (Loader) ClassUtils.instantiate(loaderName);
-
-        // Objects that should be passed to the Loader
-        PrefixContext subContext = new PrefixContext(conf,
-            MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
-        Object connectorLinkConfig = MRConfigurationUtils
-            .getConnectorLinkConfig(Direction.TO, conf);
-        Object connectorToJobConfig = MRConfigurationUtils
-            .getConnectorJobConfig(Direction.TO, conf);
-        // Using the TO schema since the SqoopDataWriter in the SqoopMapper
-        // encapsulates the toDataFormat
-
-        // Create loader context
-        LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema(), context.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
-
-        LOG.info("Running loader class " + loaderName);
-        loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
-        LOG.info("Loader has finished");
-        ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
-            loader.getRowsWritten());
-
+        final DataReader reader = new SqoopOutputFormatDataReader();
+        final Configuration conf = context.getConfiguration();
+
+        // Set context ClassLoader for this thread to the ClassLoader for to connector
+        MRUtils.initConnectorClassLoaders(conf);
+        ClassLoader classLoader = MRUtils.getConnectorClassLoader(Direction.TO);
+        ClassUtils.executeWithClassLoader(classLoader,
+            new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            Loader loader = (Loader) ClassUtils.instantiate(loaderName);
+
+            // Objects that should be passed to the Loader
+            PrefixContext subContext = new PrefixContext(conf,
+                MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+            Object connectorLinkConfig = MRConfigurationUtils
+                .getConnectorLinkConfig(Direction.TO, conf);
+            Object connectorToJobConfig = MRConfigurationUtils
+                .getConnectorJobConfig(Direction.TO, conf);
+            // Using the TO schema since the SqoopDataWriter in the SqoopMapper
+            // encapsulates the toDataFormat
+
+            // Create loader context
+            LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema(), context.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+
+            LOG.info("Running loader class " + loaderName);
+            loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
+            LOG.info("Loader has finished");
+            ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
+                loader.getRowsWritten());
+            return null;
+          }
+        });
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Error while loading data out of MR job.", t);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 2463643..4697612 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -66,6 +66,7 @@ public class TestMapReduce {
   public void testSqoopInputFormat() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
     conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
     conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
@@ -86,6 +87,7 @@ public class TestMapReduce {
   public void testSqoopMapper() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
     conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
@@ -105,6 +107,7 @@ public class TestMapReduce {
   public void testNullOutputFormat() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index d0b41d1..75a989d 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -127,6 +127,7 @@ public class TestMatching {
   public void testSchemaMatching() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
     conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 3dee8f6..58dabbb 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -182,7 +182,8 @@ public class TestSqoopOutputFormatLoadExecutor {
         writer.write(writable, null);
       }
     } catch (SqoopException ex) {
-      throw ex.getCause();
+      Assert.assertTrue(ex.getCause() instanceof SqoopException);
+      throw ex.getCause().getCause();
     }
   }
 
@@ -267,7 +268,8 @@ public class TestSqoopOutputFormatLoadExecutor {
       }
       writer.close(null);
     } catch (SqoopException ex) {
-      throw ex.getCause();
+      Assert.assertTrue(ex.getCause() instanceof SqoopException);
+      throw ex.getCause().getCause();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 460273a..d7b0dd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,8 @@ limitations under the License.
     <groovy.version>2.4.0</groovy.version>
     <jansi.version>1.7</jansi.version>
     <felix.version>2.4.0</felix.version>
+    <!-- maven plugin versions -->
+    <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
   </properties>
 
   <dependencies>
@@ -697,6 +699,7 @@ limitations under the License.
   </dependencyManagement>
 
   <modules>
+    <module>assemblies</module>
     <module>common</module>
     <module>common-test</module>
     <module>core</module>
@@ -803,6 +806,34 @@ limitations under the License.
           <artifactId>maven-bundle-plugin</artifactId>
           <version>${felix.version}</version>
         </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-assembly-plugin</artifactId>
+          <version>${maven-assembly-plugin.version}</version>
+          <dependencies>
+            <dependency>
+              <groupId>org.apache.sqoop</groupId>
+              <artifactId>sqoop-assemblies</artifactId>
+              <version>${project.version}</version>
+            </dependency>
+          </dependencies>
+          <executions>
+            <execution>
+              <id>make-assembly</id>
+              <phase>package</phase>
+              <goals>
+                <goal>single</goal>
+              </goals>
+              <configuration>
+                <finalName>${project.artifactId}-${project.version}</finalName>
+                <appendAssemblyId>false</appendAssemblyId>
+                <descriptorRefs>
+                  <descriptorRef>sqoop-connector</descriptorRef>
+                </descriptorRefs>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
       </plugins>
     </pluginManagement>
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 370a6a2..c24183c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -80,36 +80,78 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-generic-jdbc</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-hdfs</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-kite</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-kafka</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-sftp</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-ftp</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-oracle-jdbc</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c60d44f9/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index f396783..c03bf39 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -47,7 +47,6 @@ import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.model.SubmissionError;
-import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.submission.counter.Counter;
 import org.apache.sqoop.submission.counter.CounterGroup;