You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/23 07:53:27 UTC

git commit: SQOOP-1554: Add NullConfigurationClass/ EmptyConfigurationClass to support use cases that do not have a particular type of config

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 151a0a12a -> 8c604754a


SQOOP-1554: Add NullConfigurationClass/ EmptyConfigurationClass to support use cases that do not have a particular type of config

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/8c604754
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/8c604754
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/8c604754

Branch: refs/heads/sqoop2
Commit: 8c604754a4a8f17faaf9f946962976b937547f22
Parents: 151a0a1
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Oct 22 22:52:07 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Oct 22 22:52:07 2014 -0700

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsConnector.java     | 20 ++++----
 .../sqoop/connector/hdfs/HdfsDestroyer.java     | 36 -------------
 .../sqoop/connector/hdfs/HdfsExtractor.java     | 22 ++++----
 .../sqoop/connector/hdfs/HdfsFromDestroyer.java | 38 ++++++++++++++
 .../connector/hdfs/HdfsFromInitializer.java     | 48 +++++++++++++++++
 .../sqoop/connector/hdfs/HdfsInitializer.java   | 44 ----------------
 .../apache/sqoop/connector/hdfs/HdfsLoader.java | 12 ++---
 .../sqoop/connector/hdfs/HdfsPartitioner.java   | 20 ++++----
 .../sqoop/connector/hdfs/HdfsToDestroyer.java   | 38 ++++++++++++++
 .../sqoop/connector/hdfs/HdfsToInitializer.java | 48 +++++++++++++++++
 .../hdfs/configuration/LinkConfig.java          | 29 -----------
 .../hdfs/configuration/LinkConfiguration.java   | 31 -----------
 .../resources/hdfs-connector-config.properties  |  8 ---
 .../sqoop/connector/hdfs/TestExtractor.java     | 28 +++++-----
 .../apache/sqoop/connector/hdfs/TestLoader.java | 34 ++++++------
 .../sqoop/connector/hdfs/TestPartitioner.java   | 25 ++++-----
 .../connector/common/EmptyConfiguration.java    | 29 +++++++++++
 .../org/apache/sqoop/driver/JobManager.java     | 34 ++++++------
 .../org/apache/sqoop/job/TestMapReduce.java     | 21 ++++----
 .../java/org/apache/sqoop/job/TestMatching.java |  9 ++--
 .../org/apache/sqoop/job/etl/Destroyer.java     |  4 +-
 .../org/apache/sqoop/job/etl/Extractor.java     | 10 ++--
 .../org/apache/sqoop/job/etl/Initializer.java   | 54 ++++++++++++--------
 .../java/org/apache/sqoop/job/etl/Loader.java   |  9 ++--
 .../org/apache/sqoop/job/etl/Partitioner.java   | 10 ++--
 25 files changed, 365 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index e63e464..cce0e29 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -18,10 +18,13 @@
 
 package org.apache.sqoop.connector.hdfs;
 
+import java.util.Locale;
+import java.util.ResourceBundle;
+
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.VersionInfo;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
@@ -30,21 +33,18 @@ import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.validation.Validator;
 
-import java.util.Locale;
-import java.util.ResourceBundle;
-
 public class HdfsConnector extends SqoopConnector {
 
   private static final From FROM = new From(
-          HdfsInitializer.class,
+          HdfsFromInitializer.class,
           HdfsPartitioner.class,
           HdfsExtractor.class,
-          HdfsDestroyer.class);
+          HdfsFromDestroyer.class);
 
   private static final To TO = new To(
-          HdfsInitializer.class,
+          HdfsToInitializer.class,
           HdfsLoader.class,
-          HdfsDestroyer.class);
+          HdfsToDestroyer.class);
 
   private static final HdfsValidator hdfsValidator = new HdfsValidator();
 
@@ -71,15 +71,17 @@ public class HdfsConnector extends SqoopConnector {
   /**
    * @return Get connection configuration class
    */
+  @SuppressWarnings("rawtypes")
   @Override
   public Class getLinkConfigurationClass() {
-    return LinkConfiguration.class;
+    return EmptyConfiguration.class;
   }
 
   /**
    * @param jobType
    * @return Get job configuration class for given type or null if not supported
    */
+  @SuppressWarnings("rawtypes")
   @Override
   public Class getJobConfigurationClass(Direction jobType) {
     switch (jobType) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
deleted file mode 100644
index 74b1cb8..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class HdfsDestroyer extends Destroyer {
-  /**
-   * Callback to clean up after job execution.
-   *
-   * @param context Destroyer context
-   * @param o       Connection configuration object
-   * @param o2      Job configuration object
-   */
-  @Override
-  public void destroy(DestroyerContext context, Object o, Object o2) {
-    //TODO: Add a "success" flag?
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 2c8b6c8..31b0a99 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,23 +29,20 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.util.LineReader;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.log4j.Logger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.PrefixContext;
-
-import java.io.IOException;
 
 /**
  * Extract from HDFS.
  * Default field delimiter of a record is comma.
  */
-public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> {
+public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> {
 
   public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
 
@@ -51,9 +51,8 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   private long rowRead = 0;
 
   @Override
-  public void extract(ExtractorContext context,
-      LinkConfiguration linkConfig,
-      FromJobConfiguration fromJobConfig, HdfsPartition partition) {
+  public void extract(ExtractorContext context, EmptyConfiguration linkConfig,
+      FromJobConfiguration jobConfig, HdfsPartition partition) {
 
     conf = ((PrefixContext) context.getContext()).getConfiguration();
     dataWriter = context.getDataWriter();
@@ -91,6 +90,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
    * @param length
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   private void extractSequenceFile(Path file, long start, long length)
       throws IOException {
     LOG.info("Extracting sequence file");
@@ -123,6 +123,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
    * @param length
    * @throws IOException
    */
+  @SuppressWarnings("resource")
   private void extractTextFile(Path file, long start, long length)
       throws IOException {
     LOG.info("Extracting text file");
@@ -182,6 +183,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
    * @param file
    * @return boolean
    */
+  @SuppressWarnings("deprecation")
   private boolean isSequenceFile(Path file) {
     SequenceFile.Reader filereader = null;
     try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
new file mode 100644
index 0000000..c7d35f7
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConfiguration> {
+  /**
+   * Callback to clean up after job execution.
+   *
+   * @param context Destroyer context
+   * @param linkConfig link configuration object
+   * @param jobConfig FROM job configuration object
+   */
+  @Override
+  public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+      FromJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
new file mode 100644
index 0000000..0752510
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJobConfiguration> {
+  /**
+   * Initialize new submission based on given configuration properties. Any
+   * needed temporary values might be saved to context object and they will be
+   * promoted to all other part of the workflow automatically.
+   *
+   * @param context Initializer context object
+   * @param linkConfig link configuration object
+   * @param jobConfig FROM job configuration object
+   */
+  @Override
+  public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+      FromJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+      FromJobConfiguration jobConfig) {
+    return new Schema("HDFS file");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
deleted file mode 100644
index bb5e353..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-
-
-public class HdfsInitializer extends Initializer {
-  /**
-   * Initialize new submission based on given configuration properties. Any
-   * needed temporary values might be saved to context object and they will be
-   * promoted to all other part of the workflow automatically.
-   *
-   * @param context Initializer context object
-   * @param linkConfig       Connector's link configuration object
-   * @param jobConf      Connector's job configuration object
-   */
-  @Override
-  public void initialize(InitializerContext context, Object linkConfig, Object jobConf) {
-
-  }
-
-  @Override
-  public Schema getSchema(InitializerContext context, Object linkConfig, Object jobConfig) {
-    return new Schema("HDFS file");
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 660418d..682349c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -17,13 +17,16 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import java.io.IOException;
+import java.util.UUID;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
@@ -34,10 +37,7 @@ import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.utils.ClassUtils;
 
-import java.io.IOException;
-import java.util.UUID;
-
-public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+public class HdfsLoader extends Loader<EmptyConfiguration, ToJobConfiguration> {
   /**
    * Load data to target.
    *
@@ -47,7 +47,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
    * @throws Exception
    */
   @Override
-  public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
+  public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
 
     DataReader reader = context.getDataReader();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index f40459f..daa7fe2 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -21,36 +21,36 @@ package org.apache.sqoop.connector.hdfs;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
-import org.apache.sqoop.common.PrefixContext;
 
 /**
  * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
  * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
  */
-public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
+public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConfiguration> {
 
   public static final String SPLIT_MINSIZE_PERNODE =
       "mapreduce.input.fileinputformat.split.minsize.per.node";
@@ -68,7 +68,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
 
   @Override
   public List<Partition> getPartitions(PartitionerContext context,
-      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+      EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) {
 
     Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
new file mode 100644
index 0000000..8bfd727
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfiguration> {
+  /**
+   * Callback to clean up after job execution.
+   *
+   * @param context Destroyer context
+   * @param linkConfig link configuration object
+   * @param jobConfig TO job configuration object
+   */
+  @Override
+  public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+      ToJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
new file mode 100644
index 0000000..e3d54b8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConfiguration> {
+  /**
+   * Initialize new submission based on given configuration properties. Any
+   * needed temporary values might be saved to context object and they will be
+   * promoted to all other part of the workflow automatically.
+   *
+   * @param context Initializer context object
+   * @param linkConfig link configuration object
+   * @param jobConfig TO job configuration object
+   */
+  @Override
+  public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+      ToJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+      ToJobConfiguration jobConfig) {
+    return new Schema("HDFS file");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
deleted file mode 100644
index 5d48a29..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.configuration;
-
-import org.apache.sqoop.model.ConfigClass;
-import org.apache.sqoop.model.Input;
-
-@ConfigClass
-public class LinkConfig {
- //Todo: Didn't find anything that belongs here...
- // Since empty forms don't work (DERBYREPO_0008:The config contains no input metadata), I'm putting a dummy config here
-
-  @Input(size = 255) public String dummy;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
deleted file mode 100644
index c0cd336..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-
-@ConfigurationClass
-public class LinkConfiguration {
-  @Config
-  public LinkConfig linkConfig;
-
-  public LinkConfiguration() {
-    linkConfig = new LinkConfig();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 9b8c6ba..90bc8bc 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -16,14 +16,6 @@
 # Generic HDFS Connector Resources
 
 ############################
-# Link Config
-#
-linkConfig.label = Link configuration
-linkConfig.help = You must supply the information requested in order to \
-                   create a connection object.
-
-linkConfig.dummy.label = Dummy parameter needed to get HDFS connector to register
-linkConfig.dummy.help = You can write anything here. Doesn't matter.
 
 # To Job Config
 #

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index c6d2f90..124c3df 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -17,12 +17,20 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.etl.io.DataWriter;
@@ -35,14 +43,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
 @RunWith(Parameterized.class)
 public class TestExtractor extends TestHdfsBase {
   private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
@@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase {
   private ToFormat outputFileType;
   private Class<? extends CompressionCodec> compressionClass;
   private final String inputDirectory;
-  private Extractor extractor;
+  private Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> extractor;
 
   public TestExtractor(ToFormat outputFileType,
                        Class<? extends CompressionCodec> compressionClass)
@@ -131,13 +131,11 @@ public class TestExtractor extends TestHdfsBase {
       }
     });
 
-    LinkConfiguration connConf = new LinkConfiguration();
-
-    FromJobConfiguration jobConf = new FromJobConfiguration();
-
+    EmptyConfiguration emptyLinkConfig = new EmptyConfiguration();
+    FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
     HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
 
-    extractor.extract(context, connConf, jobConf, partition);
+    extractor.extract(context, emptyLinkConfig, emptyJobConfig, partition);
 
     for (int index = 0; index < NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE; ++index) {
       Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 552a751..8429e15 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -17,6 +17,16 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,7 +37,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -41,16 +51,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
 @RunWith(Parameterized.class)
 public class TestLoader extends TestHdfsBase {
   private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
@@ -121,25 +121,25 @@ public class TestLoader extends TestHdfsBase {
         return null;
       }
     }, null);
-    LinkConfiguration connConf = new LinkConfiguration();
+    EmptyConfiguration linkConf = new EmptyConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.outputDirectory = outputDirectory;
     jobConf.toJobConfig.compression = compression;
     jobConf.toJobConfig.outputFormat = outputFormat;
     Path outputPath = new Path(outputDirectory);
 
-    loader.load(context, connConf, jobConf);
+    loader.load(context, linkConf, jobConf);
     Assert.assertEquals(1, fs.listStatus(outputPath).length);
 
     for (FileStatus status : fs.listStatus(outputPath)) {
       verifyOutput(fs, status.getPath());
     }
 
-    loader.load(context, connConf, jobConf);
+    loader.load(context, linkConf, jobConf);
     Assert.assertEquals(2, fs.listStatus(outputPath).length);
-    loader.load(context, connConf, jobConf);
-    loader.load(context, connConf, jobConf);
-    loader.load(context, connConf, jobConf);
+    loader.load(context, linkConf, jobConf);
+    loader.load(context, linkConf, jobConf);
+    loader.load(context, linkConf, jobConf);
     Assert.assertEquals(5, fs.listStatus(outputPath).length);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
index 9d177ec..bef1984 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -17,12 +17,21 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.job.etl.Partition;
@@ -30,17 +39,9 @@ import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.*;
-import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestPartitioner extends TestHdfsBase {
@@ -97,12 +98,12 @@ public class TestPartitioner extends TestHdfsBase {
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
     PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
-    LinkConfiguration connConf = new LinkConfiguration();
+    EmptyConfiguration linkConf = new EmptyConfiguration();
     FromJobConfiguration jobConf = new FromJobConfiguration();
 
     jobConf.fromJobConfig.inputDirectory = inputDirectory;
 
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, linkConf, jobConf);
 
     if (this.compressionClass == null) {
       assertEquals(5, partitions.size());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
new file mode 100644
index 0000000..60b9e93
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Marker empty link configuration class with empty config
+ *
+**/
+@ConfigurationClass
+public class EmptyConfiguration {
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 51e562c..e83002d 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -350,25 +350,25 @@ public class JobManager implements Reconfigurable {
     jobRequest.setJobName(job.getName());
     jobRequest.setJobId(job.getPersistenceId());
     jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
-    Class<? extends IntermediateDataFormat<?>> dataFormatClass =
-      fromConnector.getIntermediateDataFormat();
+    Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector
+        .getIntermediateDataFormat();
     jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
 
-
     jobRequest.setFrom(fromConnector.getFrom());
     jobRequest.setTo(toConnector.getTo());
 
+    // set all the jars
     addStandardJars(jobRequest);
     addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
     addConnectorInitializerJars(jobRequest, Direction.FROM);
     addConnectorInitializerJars(jobRequest, Direction.TO);
 
-    Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
-    Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
-
+    // call the intialize method
+    initializeConnector(jobRequest, Direction.FROM);
+    initializeConnector(jobRequest, Direction.TO);
 
-    jobRequest.getSummary().setFromSchema(fromSchema);
-    jobRequest.getSummary().setToSchema(toSchema);
+    jobRequest.getSummary().setFromSchema(getSchemaForConnector(jobRequest, Direction.FROM));
+    jobRequest.getSummary().setToSchema(getSchemaForConnector(jobRequest, Direction.TO));
 
     LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
     return jobRequest;
@@ -435,21 +435,22 @@ public class JobManager implements Reconfigurable {
     }
     return job;
   }
-  
-  private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
 
+  private void initializeConnector(JobRequest jobRequest, Direction direction) {
     Initializer initializer = getConnectorInitializer(jobRequest, direction);
-
-    // Initializer context
     InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
 
     // Initialize submission from the connector perspective
     initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getJobConfig(direction));
+  }
 
+  private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
+
+    Initializer initializer = getConnectorInitializer(jobRequest, direction);
+    InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
 
-    return initializer.getSchema(initializerContext,
-        jobRequest.getConnectorLinkConfig(direction),
+    return initializer.getSchema(initializerContext, jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getJobConfig(direction));
   }
 
@@ -459,8 +460,7 @@ public class JobManager implements Reconfigurable {
     InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
     // Add job specific jars to
     jobRequest.addJars(initializer.getJars(initializerContext,
-        jobRequest.getConnectorLinkConfig(direction),
-        jobRequest.getJobConfig(direction)));
+        jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
   }
 
   private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
@@ -698,4 +698,4 @@ public class JobManager implements Reconfigurable {
       LOG.info("Ending submission manager update thread");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 6d0dcb4..78ae4ec 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.job;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -33,6 +35,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
@@ -57,8 +60,6 @@ import org.apache.sqoop.schema.type.Text;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class TestMapReduce {
 
   private static final int START_PARTITION = 1;
@@ -170,9 +171,9 @@ public class TestMapReduce {
     }
   }
 
-  public static class DummyExtractor extends Extractor {
+  public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
     @Override
-    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
         context.getDataWriter().writeArrayRecord(new Object[] {
@@ -250,12 +251,12 @@ public class TestMapReduce {
     }
   }
 
-  public static class DummyLoader extends Loader {
+  public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
     private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
     private Data expected = new Data();
 
     @Override
-    public void load(LoaderContext context, Object oc, Object oj) throws Exception{
+    public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{
       String data;
       while ((data = context.getDataReader().readTextRecord()) != null) {
         expected.setContent(new Object[] {
@@ -269,22 +270,22 @@ public class TestMapReduce {
     }
   }
 
-  public static class DummyFromDestroyer extends Destroyer {
+  public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
 
     public static int count = 0;
 
     @Override
-    public void destroy(DestroyerContext context, Object o, Object o2) {
+    public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
       count++;
     }
   }
 
-  public static class DummyToDestroyer extends Destroyer {
+  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> {
 
     public static int count = 0;
 
     @Override
-    public void destroy(DestroyerContext context, Object o, Object o2) {
+    public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
       count++;
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 665a65b..04fb692 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.job;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -34,6 +36,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
@@ -53,8 +56,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.junit.Assert.assertEquals;
-
 
 @RunWith(Parameterized.class)
 public class TestMatching {
@@ -193,9 +194,9 @@ public class TestMatching {
     }
   }
 
-  public static class DummyExtractor extends Extractor {
+  public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, Partition> {
     @Override
-    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, Partition partition) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
         context.getDataWriter().writeArrayRecord(new Object[] {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index a133106..e2d98ca 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -28,7 +28,9 @@ public abstract class Destroyer<LinkConfiguration, JobConfiguration> {
    *
    * @param context Destroyer context
    * @param linkConfiguration link configuration object
-   * @param jobConfiguration job configuration object
+   * @param jobConfiguration job configuration object for the FROM and TO
+   *        In case of the FROM initializer this will represent the FROM job configuration
+   *        In case of the TO initializer this will represent the TO job configuration
    */
   public abstract void destroy(DestroyerContext context,
                                LinkConfiguration linkConfiguration,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index d6c186d..85e91ef 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -21,20 +21,20 @@ package org.apache.sqoop.job.etl;
  * This allows connector to extract data from a source system
  * based on each partition.
  */
-public abstract class Extractor<LinkConfiguration, JobConfiguration, Partition> {
+public abstract class Extractor<LinkConfiguration, FromJobConfiguration, SqoopPartition> {
 
   /**
    * Extract data from source and pass them into the Sqoop.
    *
    * @param context Extractor context object
    * @param linkConfiguration link configuration object
-   * @param jobConfiguration job configuration object
-   * @param partition Partition that this extract should work on
+   * @param jobConfiguration FROM job configuration object
+   * @param partition Partition that this extracter should work on
    */
   public abstract void extract(ExtractorContext context,
                                LinkConfiguration linkConfiguration,
-                               JobConfiguration jobConfiguration,
-                               Partition partition);
+                               FromJobConfiguration jobConfiguration,
+                               SqoopPartition partition);
 
   /**
    * Return the number of rows read by the last call to

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 5c48fc3..d66b099 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -35,27 +35,39 @@ public abstract class Initializer<LinkConfiguration, JobConfiguration> {
    *
    * @param context Initializer context object
    * @param linkConfiguration link configuration object
-   * @param jobConfiguration job configuration object
+   * @param jobConfiguration job configuration object for the FROM and TO
+   *        In case of the FROM initializer this will represent the FROM job configuration
+   *        In case of the TO initializer this will represent the TO job configuration
    */
-	public abstract void initialize(InitializerContext context,
-			LinkConfiguration linkConfiguration,
-			JobConfiguration jobConfiguration);
-
-	/**
-	 * Return list of all jars that this particular connector needs to operate
-	 * on following job. This method will be called after running initialize
-	 * method.
-	 *
-	 * @return
-	 */
-	public List<String> getJars(InitializerContext context,
-			LinkConfiguration linkConfiguration,
-			JobConfiguration jobConfiguration) {
-		return new LinkedList<String>();
-	}
-
-	public abstract Schema getSchema(InitializerContext context,
-			LinkConfiguration linkConfiguration,
-			JobConfiguration jobConfiguration);
+  public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration);
+
+  /**
+   * Return list of all jars that this particular connector needs to operate on
+   * following job. This method will be called after running initialize method.
+   * @param context Initializer context object
+   * @param linkConfiguration link configuration object
+   * @param jobConfiguration job configuration object for the FROM and TO
+   *        In case of the FROM initializer this will represent the FROM job configuration
+   *        In case of the TO initializer this will represent the TO job configuration
+   * @return
+   */
+  public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration) {
+    return new LinkedList<String>();
+  }
+
+  /**
+   * Return schema associated with the connector for FROM and TO
+   * @param context Initializer context object
+   * @param linkConfiguration link configuration object
+   * @param jobConfiguration job configuration object for the FROM and TO
+   *        In case of the FROM initializer this will represent the FROM job configuration
+   *        In case of the TO initializer this will represent the TO job configuration
+   * @return
+   */
+
+  public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index cc32ada..3b6bd71 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -20,18 +20,17 @@ package org.apache.sqoop.job.etl;
 /**
  * This allows connector to load data into a target system.
  */
-public abstract class Loader<LinkConfiguration, JobConfiguration> {
+public abstract class Loader<LinkConfiguration, ToJobConfiguration> {
 
   /**
    * Load data to target.
    *
    * @param context Loader context object
    * @param linkConfiguration link configuration object
-   * @param jobConfiguration job configuration object
+   * @param jobConfiguration TO job configuration object
    * @throws Exception
    */
-	public abstract void load(LoaderContext context,
-			LinkConfiguration linkConfiguration,
-			JobConfiguration jobConfiguration) throws Exception;
+  public abstract void load(LoaderContext context, LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
index 57507df..3636130 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
@@ -20,10 +20,10 @@ package org.apache.sqoop.job.etl;
 import java.util.List;
 
 /**
- * This allows connector to define how input data to be partitioned.
+ * This allows connector to define how input data from the FROM source can be partitioned.
  * The number of data partitions also determines the degree of parallelism.
  */
-public abstract class Partitioner<LinkConfiguration, JobConfiguration> {
+public abstract class Partitioner<LinkConfiguration, FromJobConfiguration> {
 
   /**
    * Partition input data into partitions.
@@ -35,8 +35,6 @@ public abstract class Partitioner<LinkConfiguration, JobConfiguration> {
    * @param jobConfiguration job configuration object
    * @return
    */
-	public abstract List<Partition> getPartitions(PartitionerContext context,
-			LinkConfiguration linkConfiguration,
-			JobConfiguration jobConfiguration);
-
+  public abstract List<Partition> getPartitions(PartitionerContext context,
+      LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration);
 }