You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/06 21:16:09 UTC
[2/3] SQOOP-666 Separate execution engine module (Jarek Jarcec Cecho)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
new file mode 100644
index 0000000..d236148
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * A reducer to perform reduce function.
+ */
+public class SqoopReducer
+ extends Reducer<Data, NullWritable, Data, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(SqoopReducer.class.getName());
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
new file mode 100644
index 0000000..7dc9541
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * An input split to be read.
+ */
+public class SqoopSplit extends InputSplit implements Writable {
+
+ private Partition partition;
+
+ public void setPartition(Partition partition) {
+ this.partition = partition;
+ }
+
+ public Partition getPartition() {
+ return partition;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // read Partition class name
+ String className = in.readUTF();
+ // instantiate Partition object
+ Class<?> clz = ClassUtils.loadClass(className);
+ if (clz == null) {
+ throw new SqoopException(CoreError.CORE_0009, className);
+ }
+ try {
+ partition = (Partition) clz.newInstance();
+ } catch (Exception e) {
+ throw new SqoopException(CoreError.CORE_0010, className, e);
+ }
+ // read Partition object content
+ partition.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // write Partition class name
+ out.writeUTF(partition.getClass().getName());
+ // write Partition object content
+ partition.write(out);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
new file mode 100644
index 0000000..e685883
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileUtils {
+
+ public static boolean exists(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ return fs.exists(path);
+ }
+
+ public static void delete(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ public static void mkdirs(String directory) throws IOException {
+ Path path = new Path(directory);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (!fs.exists(path)) {
+ fs.mkdirs(path);
+ }
+ }
+
+ public static InputStream open(String fileName)
+ throws IOException, ClassNotFoundException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.open(filepath);
+ }
+
+ public static OutputStream create(String fileName) throws IOException {
+ Path filepath = new Path(fileName);
+ FileSystem fs = filepath.getFileSystem(new Configuration());
+ return fs.create(filepath, false);
+ }
+
+ private FileUtils() {
+ // Disable explicit object creation
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
new file mode 100644
index 0000000..e6ead3f
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+
+public class JobUtils {
+
+ public static void runJob(Configuration conf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+ (conf.get(FileOutputFormat.OUTDIR) != null) ?
+ SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
+ }
+
+ public static void runJob(Configuration conf,
+ Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+ Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper,
+ Class<? extends OutputFormat<Data, NullWritable>> output)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = Job.getInstance(conf);
+ job.setInputFormatClass(input);
+ job.setMapperClass(mapper);
+ job.setMapOutputKeyClass(Data.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(output);
+ job.setOutputKeyClass(Data.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertEquals("Job failed!", true, success);
+ }
+
+ private JobUtils() {
+ // Disable explicit object creation
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
new file mode 100644
index 0000000..c74faa2
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
+import org.apache.sqoop.job.etl.HdfsTextImportLoader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.junit.Test;
+
+public class TestHdfsLoad extends TestCase {
+
+ private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/";
+ private static final String OUTPUT_FILE = "part-r-00000";
+ private static final int START_ID = 1;
+ private static final int NUMBER_OF_IDS = 9;
+ private static final int NUMBER_OF_ROWS_PER_ID = 10;
+
+ private String outdir;
+
+ public TestHdfsLoad() {
+ outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
+ }
+
+ public void testVoid() {}
+ /*
+ @Test
+ public void testUncompressedText() throws Exception {
+ FileUtils.delete(outdir);
+
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+ conf.set(FileOutputFormat.OUTDIR, outdir);
+ JobUtils.runJob(conf);
+
+ String fileName = outdir + "/" + OUTPUT_FILE;
+ InputStream filestream = FileUtils.open(fileName);
+ BufferedReader filereader = new BufferedReader(new InputStreamReader(
+ filestream, Data.CHARSET_NAME));
+ verifyOutputText(filereader);
+ }
+
+ @Test
+ public void testCompressedText() throws Exception {
+ FileUtils.delete(outdir);
+
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+ conf.set(FileOutputFormat.OUTDIR, outdir);
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
+ JobUtils.runJob(conf);
+
+ Class<? extends CompressionCodec> codecClass = conf.getClass(
+ FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
+ .asSubclass(CompressionCodec.class);
+ CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+ String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
+ InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
+ BufferedReader filereader = new BufferedReader(new InputStreamReader(
+ filestream, Data.CHARSET_NAME));
+ verifyOutputText(filereader);
+ }
+
+ private void verifyOutputText(BufferedReader reader) throws IOException {
+ String actual = null;
+ String expected;
+ Data data = new Data();
+ int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ while ((actual = reader.readLine()) != null){
+ data.setContent(new Object[] {
+ new Integer(index), new Double(index), String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ expected = data.toString();
+ index++;
+
+ assertEquals(expected, actual);
+ }
+ reader.close();
+
+ assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+ index-START_ID*NUMBER_OF_ROWS_PER_ID);
+ }
+
+ @Test
+ public void testUncompressedSequence() throws Exception {
+ FileUtils.delete(outdir);
+
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+ conf.set(FileOutputFormat.OUTDIR, outdir);
+ JobUtils.runJob(conf);
+
+ Path filepath = new Path(outdir,
+ OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
+ SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(filepath));
+ verifyOutputSequence(filereader);
+ }
+
+ @Test
+ public void testCompressedSequence() throws Exception {
+ FileUtils.delete(outdir);
+
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+ conf.set(FileOutputFormat.OUTDIR, outdir);
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
+ JobUtils.runJob(conf);
+
+ Path filepath = new Path(outdir,
+ OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
+ SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(filepath));
+ verifyOutputSequence(filereader);
+ }
+
+ private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
+ int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+ Text actual = new Text();
+ Text expected = new Text();
+ Data data = new Data();
+ while (reader.next(actual)){
+ data.setContent(new Object[] {
+ new Integer(index), new Double(index), String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ expected.set(data.toString());
+ index++;
+
+ assertEquals(expected.toString(), actual.toString());
+ }
+ reader.close();
+
+ assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
+ index-START_ID*NUMBER_OF_ROWS_PER_ID);
+ }
+
+ public static class DummyPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+ }
+
+ public static class DummyPartitioner extends Partitioner {
+ @Override
+ public List<Partition> initialize(Context context) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+ DummyPartition partition = new DummyPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyExtractor extends Extractor {
+ @Override
+ public void initialize(Context context, Partition partition, DataWriter writer) {
+ int id = ((DummyPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+ Object[] array = new Object[] {
+ new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+ new Double(id*NUMBER_OF_ROWS_PER_ID+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
+ };
+ writer.writeArrayRecord(array);
+ }
+ }
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
new file mode 100644
index 0000000..51dddb4
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob.Type;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Validator;
+import org.junit.Test;
+
+public class TestJobEngine extends TestCase {
+
+ private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
+ private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
+
+ private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
+ private static final String OUTPUT_FILE = "part-r-00000";
+ private static final int START_PARTITION = 1;
+ private static final int NUMBER_OF_PARTITIONS = 9;
+ private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+ public void testVoid() { }
+/*
+ @Test
+ public void testImport() throws Exception {
+ FileUtils.delete(OUTPUT_DIR);
+
+ DummyConnector connector = new DummyConnector();
+ EtlOptions options = new EtlOptions(connector);
+
+ JobEngine engine = new JobEngine();
+ engine.initialize(options);
+
+ String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
+ InputStream filestream = FileUtils.open(fileName);
+ BufferedReader filereader = new BufferedReader(new InputStreamReader(
+ filestream, Data.CHARSET_NAME));
+ verifyOutput(filereader);
+ }
+
+ private void verifyOutput(BufferedReader reader)
+ throws IOException {
+ String line = null;
+ int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+ Data expected = new Data();
+ while ((line = reader.readLine()) != null){
+ expected.setContent(new Object[] {
+ new Integer(index),
+ new Double(index),
+ String.valueOf(index) },
+ Data.ARRAY_RECORD);
+ index++;
+
+ assertEquals(expected.toString(), line);
+ }
+ reader.close();
+
+ assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
+ index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
+ }
+
+ public class DummyConnector implements SqoopConnector {
+
+ @Override
+ public Importer getImporter() {
+ return new Importer(
+ DummyImportInitializer.class,
+ DummyImportPartitioner.class,
+ DummyImportExtractor.class,
+ null);
+ }
+
+ @Override
+ public Exporter getExporter() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public ResourceBundle getBundle(Locale locale) {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Validator getValidator() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Class getConnectionConfigurationClass() {
+ fail("This method should not be invoked.");
+ return null;
+ }
+
+ @Override
+ public Class getJobConfigurationClass(Type jobType) {
+ fail("This method should not be invoked.");
+ return null;
+ }
+ }
+
+ public static class DummyImportInitializer extends Initializer {
+ @Override
+ public void initialize(MutableContext context, Options options) {
+ context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
+ }
+ }
+
+ public static class DummyImportPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+ }
+
+ public static class DummyImportPartitioner extends Partitioner {
+ @Override
+ public List<Partition> initialize(Context context) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+ DummyImportPartition partition = new DummyImportPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyImportExtractor extends Extractor {
+ @Override
+ public void initialize(Context context, Partition partition, DataWriter writer) {
+ int id = ((DummyImportPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+ writer.writeArrayRecord(new Object[] {
+ new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+ }
+ }
+ }
+*/
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
new file mode 100644
index 0000000..94ab560
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+import org.junit.Test;
+
+public class TestMapReduce extends TestCase {
+
+ private static final int START_PARTITION = 1;
+ private static final int NUMBER_OF_PARTITIONS = 9;
+ private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+
+ public void testVoid() {}
+
+ /*
+ @Test
+ public void testInputFormat() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ Job job = Job.getInstance(conf);
+
+ SqoopInputFormat inputformat = new SqoopInputFormat();
+ List<InputSplit> splits = inputformat.getSplits(job);
+ assertEquals(9, splits.size());
+
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+ SqoopSplit split = (SqoopSplit)splits.get(id-1);
+ DummyPartition partition = (DummyPartition)split.getPartition();
+ assertEquals(id, partition.getId());
+ }
+ }
+
+ @Test
+ public void testMapper() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+
+ JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+ DummyOutputFormat.class);
+ }
+
+ @Test
+ public void testOutputFormat() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+
+ JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+ SqoopNullOutputFormat.class);
+ }
+
+ public static class DummyPartition extends Partition {
+ private int id;
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+ }
+
+ public static class DummyPartitioner extends Partitioner {
+ @Override
+ public List<Partition> initialize(Context context) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+ DummyPartition partition = new DummyPartition();
+ partition.setId(id);
+ partitions.add(partition);
+ }
+ return partitions;
+ }
+ }
+
+ public static class DummyExtractor extends Extractor {
+ @Override
+ public void initialize(Context context, Partition partition, DataWriter writer) {
+ int id = ((DummyPartition)partition).getId();
+ for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+ writer.writeArrayRecord(new Object[] {
+ new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
+ String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+ }
+ }
+ }
+
+ public static class DummyOutputFormat
+ extends OutputFormat<Data, NullWritable> {
+ @Override
+ public void checkOutputSpecs(JobContext context) {
+ // do nothing
+ }
+
+ @Override
+ public RecordWriter<Data, NullWritable> getRecordWriter(
+ TaskAttemptContext context) {
+ return new DummyRecordWriter();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new DummyOutputCommitter();
+ }
+
+ public static class DummyRecordWriter
+ extends RecordWriter<Data, NullWritable> {
+ private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+ private Data data = new Data();
+
+ @Override
+ public void write(Data key, NullWritable value) {
+ data.setContent(new Object[] {
+ new Integer(index),
+ new Double(index),
+ String.valueOf(index)},
+ Data.ARRAY_RECORD);
+ index++;
+
+ assertEquals(data.toString(), key.toString());
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) {
+ // do nothing
+ }
+ }
+
+ public static class DummyOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) { }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+ }
+ }
+
+ public static class DummyLoader extends Loader {
+ private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+ private Data expected = new Data();
+ private Data actual = new Data();
+
+ @Override
+ public void initialize(Context context, DataReader reader) {
+ Object[] array;
+ while ((array = reader.readArrayRecord()) != null) {
+ actual.setContent(array, Data.ARRAY_RECORD);
+
+ expected.setContent(new Object[] {
+ new Integer(index),
+ new Double(index),
+ String.valueOf(index)},
+ Data.ARRAY_RECORD);
+ index++;
+
+ assertEquals(expected.toString(), actual.toString());
+ };
+ }
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
new file mode 100644
index 0000000..d4a7d4d
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+public class TestData extends TestCase {
+
+ private static final double TEST_NUMBER = Math.PI + 100;
+ @Test
+ public void testArrayToCsv() throws Exception {
+ Data data = new Data();
+ String expected;
+ String actual;
+
+ // with special characters:
+ expected =
+ Long.valueOf((long)TEST_NUMBER) + "," +
+ Double.valueOf(TEST_NUMBER) + "," +
+ "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+ Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+ data.setContent(new Object[] {
+ Long.valueOf((long)TEST_NUMBER),
+ Double.valueOf(TEST_NUMBER),
+ String.valueOf(TEST_NUMBER) + "',s",
+ new byte[] {1, 2, 3, 4, 5} },
+ Data.ARRAY_RECORD);
+ actual = (String)data.getContent(Data.CSV_RECORD);
+ assertEquals(expected, actual);
+
+ // with null characters:
+ expected =
+ Long.valueOf((long)TEST_NUMBER) + "," +
+ Double.valueOf(TEST_NUMBER) + "," +
+ "null" + "," +
+ Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+ data.setContent(new Object[] {
+ Long.valueOf((long)TEST_NUMBER),
+ Double.valueOf(TEST_NUMBER),
+ null,
+ new byte[] {1, 2, 3, 4, 5} },
+ Data.ARRAY_RECORD);
+ actual = (String)data.getContent(Data.CSV_RECORD);
+ assertEquals(expected, actual);
+ }
+
+ public static void assertEquals(Object expected, Object actual) {
+ if (expected instanceof byte[]) {
+ assertEquals(Arrays.toString((byte[])expected),
+ Arrays.toString((byte[])actual));
+ } else {
+ TestCase.assertEquals(expected, actual);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/pom.xml
----------------------------------------------------------------------
diff --git a/execution/pom.xml b/execution/pom.xml
new file mode 100644
index 0000000..fb9f801
--- /dev/null
+++ b/execution/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>sqoop</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>execution</artifactId>
+ <name>Sqoop Execution Engines</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>mapreduce</module>
+ </modules>
+
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a4915fd..4211333 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,7 @@ limitations under the License.
<module>client</module>
<module>docs</module>
<module>connector</module>
+ <module>execution</module>
<module>submission</module>
<module>dist</module>
</modules>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml
index 03c06c0..f8a7d3d 100644
--- a/submission/mapreduce/pom.xml
+++ b/submission/mapreduce/pom.xml
@@ -37,6 +37,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.sqoop.execution</groupId>
+ <artifactId>sqoop-execution-mapreduce</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 94098de..b8415e3 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
+import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.SubmissionEngine;
import org.apache.sqoop.job.JobConstants;
@@ -116,8 +118,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
* {@inheritDoc}
*/
@Override
- @SuppressWarnings("unchecked")
- public boolean submit(SubmissionRequest request) {
+ public boolean isExecutionEngineSupported(Class executionEngineClass) {
+ if(executionEngineClass == MapreduceExecutionEngine.class) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean submit(SubmissionRequest generalRequest) {
+ // We're supporting only map reduce jobs
+ MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
+
// Clone global configuration
Configuration configuration = new Configuration(globalConfiguration);