You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/10/19 22:35:48 UTC

git commit: SQOOP-610: Job submission engine for import

Updated Branches:
  refs/heads/sqoop2 6cd9e9688 -> 7fbd3ba94


SQOOP-610: Job submission engine for import

(Bilung Lee via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/7fbd3ba9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/7fbd3ba9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/7fbd3ba9

Branch: refs/heads/sqoop2
Commit: 7fbd3ba94b970ca4c2053d3930dbf6d84989213e
Parents: 6cd9e96
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Oct 19 13:28:18 2012 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Oct 19 13:28:18 2012 -0700

----------------------------------------------------------------------
 .../java/org/apache/sqoop/job/JobConstants.java    |   10 +
 .../main/java/org/apache/sqoop/job/JobEngine.java  |   37 +++
 .../org/apache/sqoop/job/etl/EtlFramework.java     |  148 +++++++++++
 .../java/org/apache/sqoop/job/etl/EtlOptions.java  |  165 ++++++++++++
 .../java/org/apache/sqoop/job/mr/MrExecution.java  |  153 +++++++++++
 .../java/org/apache/sqoop/job/mr/SqoopReducer.java |   35 +++
 .../java/org/apache/sqoop/job/TestJobEngine.java   |  199 +++++++++++++++
 7 files changed, 747 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
index a032c72..2b0ec18 100644
--- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -21,6 +21,16 @@ import org.apache.sqoop.core.ConfigurationConstants;
 
 public final class JobConstants extends Constants {
 
+  // Metadata constants
+
+  public static final String INPUT_JOB_JOB_TYPE = "inp-job-job-type";
+  public static final String INPUT_JOB_STORAGE_TYPE = "inp-job-storage-type";
+  public static final String INPUT_JOB_FORMAT_TYPE = "inp-job-format-type";
+  public static final String INPUT_JOB_OUTPUT_CODEC = "inp-job-output-codec";
+  public static final String INPUT_JOB_MAX_EXTRACTORS = "inp-job-max-extractors";
+  public static final String INPUT_JOB_MAX_LOADERS = "inp-job-max-loaders";
+
+
   /**
    * All job related configuration is prefixed with this:
    * <tt>org.apache.sqoop.job.</tt>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/JobEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobEngine.java b/core/src/main/java/org/apache/sqoop/job/JobEngine.java
new file mode 100644
index 0000000..fa3e484
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/JobEngine.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.job.etl.EtlFramework;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.mr.MrExecution;
+
+/**
+ * This class supports Sqoop job execution.
+ */
+public class JobEngine {
+
+  public void run(EtlOptions options) {
+    EtlFramework etl = new EtlFramework(options);
+    MrExecution mr = new MrExecution(etl);
+    mr.initialize();
+    mr.run();
+    mr.destroy();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
new file mode 100644
index 0000000..ce7f988
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlFramework.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.EtlOptions.FormatType;
+import org.apache.sqoop.job.etl.EtlOptions.JobType;
+import org.apache.sqoop.job.etl.EtlOptions.StorageType;
+
+/**
+ * This class encapsulates the whole ETL framework.
+ *
+ * For import:
+ * Initializer (connector-defined)
+ * -> Partitioner (connector-defined)
+ * -> Extractor (connector-defined)
+ * -> Loader (framework-defined)
+ * -> Destroyer (connector-defined)
+ *
+ * For export:
+ * Initializer (connector-defined)
+ * -> Partitioner (framework-defined)
+ * -> Extractor (framework-defined)
+ * -> Loader (connector-defined)
+ * -> Destroyer (connector-defined)
+ */
+public class EtlFramework {
+
+  private Class<? extends Initializer> initializer;
+  private Class<? extends Partitioner> partitioner;
+  private Class<? extends Extractor> extractor;
+  private Class<? extends Loader> loader;
+  private Class<? extends Destroyer> destroyer;
+
+  private boolean requireFieldNames;
+  private boolean requireOutputDirectory;
+
+  private EtlOptions options;
+
+  public EtlFramework(EtlOptions inputs) {
+    this.options = inputs;
+    JobType jobType = options.getJobType();
+    switch (jobType) {
+    case IMPORT:
+      constructImport();
+      break;
+    case EXPORT:
+      constructExport();
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, jobType.toString());
+    }
+  }
+
+  public EtlOptions getOptions() {
+    return options;
+  }
+
+  public Class<? extends Initializer> getInitializer() {
+    return initializer;
+  }
+
+  public Class<? extends Partitioner> getPartitioner() {
+    return partitioner;
+  }
+
+  public Class<? extends Extractor> getExtractor() {
+    return extractor;
+  }
+
+  public Class<? extends Loader> getLoader() {
+    return loader;
+  }
+
+  public Class<? extends Destroyer> getDestroyer() {
+    return destroyer;
+  }
+
+  public boolean isFieldNamesRequired() {
+    return requireFieldNames;
+  }
+
+  public boolean isOutputDirectoryRequired() {
+    return requireOutputDirectory;
+  }
+
+  private void constructImport() {
+    Importer importer = options.getConnector().getImporter();
+    initializer = importer.getInitializer();
+    partitioner = importer.getPartitioner();
+    extractor = importer.getExtractor();
+    destroyer = importer.getDestroyer();
+
+    StorageType storageType = options.getStorageType();
+    switch (storageType) {
+    case HDFS:
+      FormatType formatType = options.getFormatType();
+      switch (formatType) {
+      case TEXT:
+        loader = HdfsTextImportLoader.class;
+        requireOutputDirectory = true;
+        break;
+      case SEQUENCE:
+        loader = HdfsSequenceImportLoader.class;
+        requireOutputDirectory = true;
+        break;
+      default:
+        throw new SqoopException(CoreError.CORE_0012, formatType.toString());
+      }
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, storageType.toString());
+    }
+  }
+
+  private void constructExport() {
+    Exporter exporter = options.getConnector().getExporter();
+    initializer = exporter.getInitializer();
+    loader = exporter.getLoader();
+    destroyer = exporter.getDestroyer();
+
+    // FIXME: set partitioner/extractor based on storage/format types
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
new file mode 100644
index 0000000..e45c0ff
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlOptions.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.util.HashMap;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * This class retrieves information for job execution from user-input options.
+ */
+public class EtlOptions implements Options {
+
+  HashMap<String, String> store = new HashMap<String, String>();
+
+  public EtlOptions(SqoopConnector connector) {
+    this.connector = connector;
+  }
+
+  private SqoopConnector connector;
+  public SqoopConnector getConnector() {
+    return connector;
+  }
+
+  private JobType jobType = null;
+  public enum JobType {
+    IMPORT,
+    EXPORT
+  }
+  public JobType getJobType() {
+    if (jobType != null) {
+      return jobType;
+    }
+
+    String option = store.get(JobConstants.INPUT_JOB_JOB_TYPE);
+    if (option == null || option.equalsIgnoreCase("IMPORT")) {
+      jobType = JobType.IMPORT;
+    } else if (option.equalsIgnoreCase("EXPORT")) {
+      jobType = JobType.EXPORT;
+    } else {
+      throw new SqoopException(CoreError.CORE_0012, option);
+    }
+    return jobType;
+  }
+
+  private StorageType storageType = null;
+  public enum StorageType {
+    HDFS
+  }
+  public StorageType getStorageType() {
+    if (storageType != null) {
+      return storageType;
+    }
+
+    String option = store.get(JobConstants.INPUT_JOB_STORAGE_TYPE);
+    if (option == null || option.equalsIgnoreCase("HDFS")) {
+      storageType = StorageType.HDFS;
+    } else {
+      throw new SqoopException(CoreError.CORE_0012, option);
+    }
+    return storageType;
+  }
+
+  private FormatType formatType = null;
+  public enum FormatType {
+    TEXT,
+    SEQUENCE
+  }
+  public FormatType getFormatType() {
+    if (formatType != null) {
+      return formatType;
+    }
+
+    String option = store.get(JobConstants.INPUT_JOB_FORMAT_TYPE);
+    if (option == null || option.equalsIgnoreCase("TEXT")) {
+      formatType = FormatType.TEXT;
+    } else if (option.equalsIgnoreCase("SEQUENCE")) {
+      formatType = FormatType.SEQUENCE;
+    } else {
+      throw new SqoopException(CoreError.CORE_0012, option);
+    }
+    return formatType;
+  }
+
+  public String getOutputCodec() {
+    return store.get(JobConstants.INPUT_JOB_OUTPUT_CODEC);
+  }
+
+  private int maxExtractors = -1;
+  public int getMaxExtractors() {
+    if (maxExtractors != -1) {
+      return maxExtractors;
+    }
+
+    String option = store.get(JobConstants.INPUT_JOB_MAX_EXTRACTORS);
+    if (option != null) {
+      maxExtractors = Integer.parseInt(option);
+    } else {
+      JobType type = getJobType();
+      switch (type) {
+      case IMPORT:
+        maxExtractors = 4;
+        break;
+      case EXPORT:
+        maxExtractors = 1;
+        break;
+      default:
+        throw new SqoopException(CoreError.CORE_0012, type.toString());
+      }
+    }
+    return maxExtractors;
+  }
+
+  private int maxLoaders = -1;
+  public int getMaxLoaders() {
+    if (maxLoaders != -1) {
+      return maxLoaders;
+    }
+
+    String option = store.get(JobConstants.INPUT_JOB_MAX_LOADERS);
+    if (option != null) {
+      maxLoaders = Integer.parseInt(option);
+    } else {
+      JobType type = getJobType();
+      switch (type) {
+      case IMPORT:
+        maxLoaders = 1;
+        break;
+      case EXPORT:
+        maxLoaders = 4;
+        break;
+      default:
+        throw new SqoopException(CoreError.CORE_0012, type.toString());
+      }
+    }
+    return maxLoaders;
+  }
+
+  public void setOption(String key, String value) {
+    store.put(key, value);
+  }
+
+  @Override
+  public String getOption(String key) {
+    return store.get(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
new file mode 100644
index 0000000..bd4c108
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/mr/MrExecution.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.EtlFramework;
+import org.apache.sqoop.job.etl.EtlMutableContext;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.etl.EtlOptions.JobType;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * This class encapsulates the whole MapReduce execution.
+ */
+public class MrExecution {
+
+  private Configuration conf;
+  private EtlFramework etl;
+
+  public MrExecution(EtlFramework etl) {
+    this.conf = new Configuration();
+    this.etl = etl;
+  }
+
+  public void initialize() {
+    EtlOptions options = etl.getOptions();
+
+    conf.setInt(JobConstants.JOB_ETL_NUMBER_PARTITIONS,
+        options.getMaxExtractors());
+
+    if (options.getOutputCodec() != null) {
+      conf.setBoolean(FileOutputFormat.COMPRESS, true);
+      conf.set(FileOutputFormat.COMPRESS_CODEC, options.getOutputCodec());
+    }
+
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, etl.getPartitioner().getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, etl.getExtractor().getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, etl.getLoader().getName());
+
+    EtlMutableContext context = new EtlMutableContext(conf);
+
+    Class<? extends Initializer> initializer = etl.getInitializer();
+    if (initializer != null) {
+      Initializer instance;
+      try {
+        instance = (Initializer) initializer.newInstance();
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, initializer.getName(), e);
+      }
+      instance.run(context, options);
+    }
+
+    JobType jobType = etl.getOptions().getJobType();
+    switch (jobType) {
+    case IMPORT:
+      checkImportConfiguration(context);
+      break;
+    case EXPORT:
+      checkExportConfiguration(context);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, jobType.toString());
+    }
+  }
+
+  public void run() {
+    EtlOptions options = etl.getOptions();
+
+    try {
+      Job job = Job.getInstance(conf);
+
+      job.setInputFormatClass(SqoopInputFormat.class);
+      job.setMapperClass(SqoopMapper.class);
+      job.setMapOutputKeyClass(Data.class);
+      job.setMapOutputValueClass(NullWritable.class);
+      if (options.getMaxLoaders() > 1) {
+        job.setReducerClass(SqoopReducer.class);
+        job.setNumReduceTasks(options.getMaxLoaders());
+      }
+      job.setOutputFormatClass((etl.isOutputDirectoryRequired()) ?
+              SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
+      job.setOutputKeyClass(Data.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      boolean success = job.waitForCompletion(true);
+      if (!success) {
+        throw new SqoopException(CoreError.CORE_0008);
+      }
+
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0008, e);
+    }
+  }
+
+  public void destroy() {
+    Class<? extends Destroyer> destroyer = etl.getDestroyer();
+    if (destroyer != null) {
+      Destroyer instance;
+      try {
+        instance = (Destroyer) destroyer.newInstance();
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, destroyer.getName(), e);
+      }
+      instance.run(new EtlContext(conf));
+    }
+  }
+
+  private void checkImportConfiguration(EtlMutableContext context) {
+    if (etl.isFieldNamesRequired() &&
+        context.getString(JobConstants.JOB_ETL_FIELD_NAMES) == null) {
+      throw new SqoopException(CoreError.CORE_0020, "field names");
+    }
+
+    if (etl.isOutputDirectoryRequired()) {
+      String outputDirectory =
+          context.getString(JobConstants.JOB_ETL_OUTPUT_DIRECTORY);
+      if (outputDirectory == null) {
+        throw new SqoopException(CoreError.CORE_0020, "output directory");
+      } else {
+        context.setString(FileOutputFormat.OUTDIR, outputDirectory);
+      }
+    }
+  }
+
+  private void checkExportConfiguration(EtlMutableContext context) {
+    // TODO: check export related configuration
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
new file mode 100644
index 0000000..d236148
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * A reducer to perform reduce function.
+ */
+public class SqoopReducer
+    extends Reducer<Data, NullWritable, Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopReducer.class.getName());
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7fbd3ba9/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
new file mode 100644
index 0000000..e653c22
--- /dev/null
+++ b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.JobEngine;
+import org.apache.sqoop.job.etl.Context;
+import org.apache.sqoop.job.etl.EtlOptions;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.MutableContext;
+import org.apache.sqoop.job.etl.Options;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob.Type;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Validator;
+import org.junit.Test;
+
+public class TestJobEngine extends TestCase {
+
+  private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
+  private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
+
+  private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
+  private static final String OUTPUT_FILE = "part-r-00000";
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 9;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+  @Test
+  public void testImport() throws Exception {
+    FileUtils.delete(OUTPUT_DIR);
+
+    DummyConnector connector = new DummyConnector();
+    EtlOptions options = new EtlOptions(connector);
+
+    JobEngine engine = new JobEngine();
+    engine.run(options);
+
+    String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
+    InputStream filestream = FileUtils.open(fileName);
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutput(filereader);
+  }
+
+  private void verifyOutput(BufferedReader reader)
+      throws IOException {
+    String line = null;
+    int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+    Data expected = new Data();
+    while ((line = reader.readLine()) != null){
+      expected.setContent(new Object[] {
+          new Integer(index),
+          new Double(index),
+          String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      index++;
+
+      assertEquals(expected.toString(), line);
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
+        index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
+  }
+
+  public class DummyConnector implements SqoopConnector {
+
+    @Override
+    public Importer getImporter() {
+      return new Importer(
+          DummyImportInitializer.class,
+          DummyImportPartitioner.class,
+          DummyImportExtractor.class,
+          null);
+    }
+
+    @Override
+    public Exporter getExporter() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public ResourceBundle getBundle(Locale locale) {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Validator getValidator() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Class getConnectionConfigurationClass() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Class getJobConfigurationClass(Type jobType) {
+      fail("This method should not be invoked.");
+      return null;
+    }
+  }
+
+  public static class DummyImportInitializer extends Initializer {
+    @Override
+    public void run(MutableContext context, Options options) {
+      context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
+    }
+  }
+
+  public static class DummyImportPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyImportPartitioner extends Partitioner {
+    @Override
+    public List<Partition> run(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+        DummyImportPartition partition = new DummyImportPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyImportExtractor extends Extractor {
+    @Override
+    public void run(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyImportPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+        writer.writeArrayRecord(new Object[] {
+            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+      }
+    }
+  }
+
+}