You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/06 21:16:09 UTC

[2/3] SQOOP-666 Separate execution engine module (Jarek Jarcec Cecho)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
new file mode 100644
index 0000000..d236148
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * A reducer to perform reduce function.
+ */
+public class SqoopReducer
+    extends Reducer<Data, NullWritable, Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopReducer.class.getName());
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
new file mode 100644
index 0000000..7dc9541
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * An input split to be read.
+ */
+public class SqoopSplit extends InputSplit implements Writable {
+
+  private Partition partition;
+
+  public void setPartition(Partition partition) {
+    this.partition = partition;
+  }
+
+  public Partition getPartition() {
+    return partition;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return new String[] {};
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read Partition class name
+    String className = in.readUTF();
+    // instantiate Partition object
+    Class<?> clz = ClassUtils.loadClass(className);
+    if (clz == null) {
+      throw new SqoopException(CoreError.CORE_0009, className);
+    }
+    try {
+      partition = (Partition) clz.newInstance();
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0010, className, e);
+    }
+    // read Partition object content
+    partition.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write Partition class name
+    out.writeUTF(partition.getClass().getName());
+    // write Partition object content
+    partition.write(out);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
new file mode 100644
index 0000000..e685883
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileUtils {
+
+  public static boolean exists(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    return fs.exists(path);
+  }
+
+  public static void delete(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+  }
+
+  public static void mkdirs(String directory) throws IOException {
+    Path path = new Path(directory);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
+  }
+
+  public static InputStream open(String fileName)
+    throws IOException, ClassNotFoundException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.open(filepath);
+  }
+
+  public static OutputStream create(String fileName) throws IOException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.create(filepath, false);
+  }
+
+  private FileUtils() {
+    // Disable explicit object creation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
new file mode 100644
index 0000000..e6ead3f
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+
+public class JobUtils {
+
+  public static void runJob(Configuration conf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        (conf.get(FileOutputFormat.OUTDIR) != null) ?
+        SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
+  }
+
+  public static void runJob(Configuration conf,
+      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+      Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper,
+      Class<? extends OutputFormat<Data, NullWritable>> output)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = Job.getInstance(conf);
+    job.setInputFormatClass(input);
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(Data.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(output);
+    job.setOutputKeyClass(Data.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    boolean success = job.waitForCompletion(true);
+    Assert.assertEquals("Job failed!", true, success);
+  }
+
+  private JobUtils() {
+    // Disable explicit object creation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
new file mode 100644
index 0000000..c74faa2
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
+import org.apache.sqoop.job.etl.HdfsTextImportLoader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.junit.Test;
+
+public class TestHdfsLoad extends TestCase {
+
+  private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/";
+  private static final String OUTPUT_FILE = "part-r-00000";
+  private static final int START_ID = 1;
+  private static final int NUMBER_OF_IDS = 9;
+  private static final int NUMBER_OF_ROWS_PER_ID = 10;
+
+  private String outdir;
+
+  public TestHdfsLoad() {
+    outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
+  }
+
+  public void testVoid() {}
+  /*
+  @Test
+  public void testUncompressedText() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    JobUtils.runJob(conf);
+
+    String fileName = outdir + "/" +  OUTPUT_FILE;
+    InputStream filestream = FileUtils.open(fileName);
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutputText(filereader);
+  }
+
+  @Test
+  public void testCompressedText() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    JobUtils.runJob(conf);
+
+    Class<? extends CompressionCodec> codecClass = conf.getClass(
+        FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
+        .asSubclass(CompressionCodec.class);
+    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+    String fileName = outdir + "/" +  OUTPUT_FILE + codec.getDefaultExtension();
+    InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutputText(filereader);
+  }
+
+  private void verifyOutputText(BufferedReader reader) throws IOException {
+    String actual = null;
+    String expected;
+    Data data = new Data();
+    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    while ((actual = reader.readLine()) != null){
+      data.setContent(new Object[] {
+          new Integer(index), new Double(index), String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      expected = data.toString();
+      index++;
+
+      assertEquals(expected, actual);
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+        index-START_ID*NUMBER_OF_ROWS_PER_ID);
+  }
+
+  @Test
+  public void testUncompressedSequence() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    JobUtils.runJob(conf);
+
+    Path filepath = new Path(outdir,
+        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
+    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(filepath));
+    verifyOutputSequence(filereader);
+  }
+
+  @Test
+  public void testCompressedSequence() throws Exception {
+    FileUtils.delete(outdir);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+    conf.set(FileOutputFormat.OUTDIR, outdir);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    JobUtils.runJob(conf);
+
+    Path filepath = new Path(outdir,
+        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
+    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(filepath));
+    verifyOutputSequence(filereader);
+  }
+
+  private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
+    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    Text actual = new Text();
+    Text expected = new Text();
+    Data data = new Data();
+    while (reader.next(actual)){
+      data.setContent(new Object[] {
+          new Integer(index), new Double(index), String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      expected.set(data.toString());
+      index++;
+
+      assertEquals(expected.toString(), actual.toString());
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+        index-START_ID*NUMBER_OF_ROWS_PER_ID);
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> initialize(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void initialize(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+        Object[] array = new Object[] {
+          new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+          new Double(id*NUMBER_OF_ROWS_PER_ID+row),
+          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
+        };
+        writer.writeArrayRecord(array);
+      }
+    }
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
new file mode 100644
index 0000000..51dddb4
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob.Type;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Validator;
+import org.junit.Test;
+
+public class TestJobEngine extends TestCase {
+
+  private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
+  private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
+
+  private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
+  private static final String OUTPUT_FILE = "part-r-00000";
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 9;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+  public void testVoid() { }
+/*
+  @Test
+  public void testImport() throws Exception {
+    FileUtils.delete(OUTPUT_DIR);
+
+    DummyConnector connector = new DummyConnector();
+    EtlOptions options = new EtlOptions(connector);
+
+    JobEngine engine = new JobEngine();
+    engine.initialize(options);
+
+    String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
+    InputStream filestream = FileUtils.open(fileName);
+    BufferedReader filereader = new BufferedReader(new InputStreamReader(
+        filestream, Data.CHARSET_NAME));
+    verifyOutput(filereader);
+  }
+
+  private void verifyOutput(BufferedReader reader)
+      throws IOException {
+    String line = null;
+    int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+    Data expected = new Data();
+    while ((line = reader.readLine()) != null){
+      expected.setContent(new Object[] {
+          new Integer(index),
+          new Double(index),
+          String.valueOf(index) },
+          Data.ARRAY_RECORD);
+      index++;
+
+      assertEquals(expected.toString(), line);
+    }
+    reader.close();
+
+    assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
+        index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
+  }
+
+  public class DummyConnector implements SqoopConnector {
+
+    @Override
+    public Importer getImporter() {
+      return new Importer(
+          DummyImportInitializer.class,
+          DummyImportPartitioner.class,
+          DummyImportExtractor.class,
+          null);
+    }
+
+    @Override
+    public Exporter getExporter() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public ResourceBundle getBundle(Locale locale) {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Validator getValidator() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Class getConnectionConfigurationClass() {
+      fail("This method should not be invoked.");
+      return null;
+    }
+
+    @Override
+    public Class getJobConfigurationClass(Type jobType) {
+      fail("This method should not be invoked.");
+      return null;
+    }
+  }
+
+  public static class DummyImportInitializer extends Initializer {
+    @Override
+    public void initialize(MutableContext context, Options options) {
+      context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
+    }
+  }
+
+  public static class DummyImportPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyImportPartitioner extends Partitioner {
+    @Override
+    public List<Partition> initialize(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+        DummyImportPartition partition = new DummyImportPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyImportExtractor extends Extractor {
+    @Override
+    public void initialize(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyImportPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+        writer.writeArrayRecord(new Object[] {
+            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+      }
+    }
+  }
+*/
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
new file mode 100644
index 0000000..94ab560
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+import org.junit.Test;
+
+public class TestMapReduce extends TestCase {
+
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 9;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+  public void testVoid() {}
+
+  /*
+  @Test
+  public void testInputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    Job job = Job.getInstance(conf);
+
+    SqoopInputFormat inputformat = new SqoopInputFormat();
+    List<InputSplit> splits = inputformat.getSplits(job);
+    assertEquals(9, splits.size());
+
+    for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+      SqoopSplit split = (SqoopSplit)splits.get(id-1);
+      DummyPartition partition = (DummyPartition)split.getPartition();
+      assertEquals(id, partition.getId());
+    }
+  }
+
+  @Test
+  public void testMapper() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+
+    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        DummyOutputFormat.class);
+  }
+
+  @Test
+  public void testOutputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+
+    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+        SqoopNullOutputFormat.class);
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> initialize(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void initialize(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+        writer.writeArrayRecord(new Object[] {
+            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+      }
+    }
+  }
+
+  public static class DummyOutputFormat
+      extends OutputFormat<Data, NullWritable> {
+    @Override
+    public void checkOutputSpecs(JobContext context) {
+      // do nothing
+    }
+
+    @Override
+    public RecordWriter<Data, NullWritable> getRecordWriter(
+        TaskAttemptContext context) {
+      return new DummyRecordWriter();
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+      return new DummyOutputCommitter();
+    }
+
+    public static class DummyRecordWriter
+        extends RecordWriter<Data, NullWritable> {
+      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+      private Data data = new Data();
+
+      @Override
+      public void write(Data key, NullWritable value) {
+        data.setContent(new Object[] {
+          new Integer(index),
+          new Double(index),
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
+        index++;
+
+        assertEquals(data.toString(), key.toString());
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {
+        // do nothing
+      }
+    }
+
+    public static class DummyOutputCommitter extends OutputCommitter {
+      @Override
+      public void setupJob(JobContext jobContext) { }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+        return false;
+      }
+    }
+  }
+
+  public static class DummyLoader extends Loader {
+    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+    private Data expected = new Data();
+    private Data actual = new Data();
+
+    @Override
+    public void initialize(Context context, DataReader reader) {
+      Object[] array;
+      while ((array = reader.readArrayRecord()) != null) {
+        actual.setContent(array, Data.ARRAY_RECORD);
+
+        expected.setContent(new Object[] {
+          new Integer(index),
+          new Double(index),
+          String.valueOf(index)},
+          Data.ARRAY_RECORD);
+        index++;
+
+        assertEquals(expected.toString(), actual.toString());
+      };
+    }
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
new file mode 100644
index 0000000..d4a7d4d
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+public class TestData extends TestCase {
+
+  private static final double TEST_NUMBER = Math.PI + 100;
+  @Test
+  public void testArrayToCsv() throws Exception {
+    Data data = new Data();
+    String expected;
+    String actual;
+
+    // with special characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        String.valueOf(TEST_NUMBER) + "',s",
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+
+    // with null characters:
+    expected =
+        Long.valueOf((long)TEST_NUMBER) + "," +
+        Double.valueOf(TEST_NUMBER) + "," +
+        "null" + "," +
+        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+    data.setContent(new Object[] {
+        Long.valueOf((long)TEST_NUMBER),
+        Double.valueOf(TEST_NUMBER),
+        null,
+        new byte[] {1, 2, 3, 4, 5} },
+        Data.ARRAY_RECORD);
+    actual = (String)data.getContent(Data.CSV_RECORD);
+    assertEquals(expected, actual);
+  }
+
+  public static void assertEquals(Object expected, Object actual) {
+    if (expected instanceof byte[]) {
+      assertEquals(Arrays.toString((byte[])expected),
+          Arrays.toString((byte[])actual));
+    } else {
+      TestCase.assertEquals(expected, actual);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/pom.xml
----------------------------------------------------------------------
diff --git a/execution/pom.xml b/execution/pom.xml
new file mode 100644
index 0000000..fb9f801
--- /dev/null
+++ b/execution/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>sqoop</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop</groupId>
+  <artifactId>execution</artifactId>
+  <name>Sqoop Execution Engines</name>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>mapreduce</module>
+  </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a4915fd..4211333 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,7 @@ limitations under the License.
     <module>client</module>
     <module>docs</module>
     <module>connector</module>
+    <module>execution</module>
     <module>submission</module>
     <module>dist</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml
index 03c06c0..f8a7d3d 100644
--- a/submission/mapreduce/pom.xml
+++ b/submission/mapreduce/pom.xml
@@ -37,6 +37,12 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop.execution</groupId>
+      <artifactId>sqoop-execution-mapreduce</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 94098de..b8415e3 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
+import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
 import org.apache.sqoop.framework.SubmissionRequest;
 import org.apache.sqoop.framework.SubmissionEngine;
 import org.apache.sqoop.job.JobConstants;
@@ -116,8 +118,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    * {@inheritDoc}
    */
   @Override
-  @SuppressWarnings("unchecked")
-  public boolean submit(SubmissionRequest request) {
+  public boolean isExecutionEngineSupported(Class executionEngineClass) {
+    if(executionEngineClass == MapreduceExecutionEngine.class) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean submit(SubmissionRequest generalRequest) {
+    // We're supporting only map reduce jobs
+    MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
+
     // Clone global configuration
     Configuration configuration = new Configuration(globalConfiguration);