You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/08/31 08:24:06 UTC

svn commit: r1379307 - in /sqoop/branches/sqoop2: connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/ core/ core/src/main/java/org/apache/sqoop/core/ core/src/main/java/org/apache/sqoop/job/ core/src/main/java/org/apache/sqo...

Author: jarcec
Date: Fri Aug 31 06:24:05 2012
New Revision: 1379307

URL: http://svn.apache.org/viewvc?rev=1379307&view=rev
Log:
SQOOP-588. Provide MapReduce classes for executing ETL classes.

(Bilung Lee via Jarek Jarcec Cecho)

Added:
    sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
    sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/
    sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
Modified:
    sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
    sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
    sqoop/branches/sqoop2/core/pom.xml
    sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java
    sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
    sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
    sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
    sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java

Modified: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java (original)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java Fri Aug 31 06:24:05 2012
@@ -20,7 +20,6 @@ package org.apache.sqoop.connector.jdbc;
 import org.apache.sqoop.job.etl.Context;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.job.io.DataWriter;
 
 public class GenericJdbcExportLoader extends Loader {
 

Added: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java (added)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.sqoop.job.etl.Partition;
+
+public class GenericJdbcImportPartition extends Partition {
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+}

Modified: sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java (original)
+++ sqoop/branches/sqoop2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java Fri Aug 31 06:24:05 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.sqoop.job.etl.Context;
@@ -28,7 +29,7 @@ public class GenericJdbcImportPartitione
   @Override
   public List<Partition> run(Context context) {
     // TODO Auto-generated method stub
-    return null;
+    return new LinkedList<Partition>();
   }
 
 }

Modified: sqoop/branches/sqoop2/core/pom.xml
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/pom.xml?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/core/pom.xml (original)
+++ sqoop/branches/sqoop2/core/pom.xml Fri Aug 31 06:24:05 2012
@@ -43,6 +43,16 @@ limitations under the License.
       <version>2.0.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>commons-dbcp</groupId>
       <artifactId>commons-dbcp</artifactId>
     </dependency>

Modified: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java (original)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/core/CoreError.java Fri Aug 31 06:24:05 2012
@@ -49,7 +49,43 @@ public enum CoreError implements ErrorCo
   CORE_0006("Properties configuration provider unable to load config file"),
 
   /** The configuration system has not been initialized correctly. */
-  CORE_0007("System not initialized");
+  CORE_0007("System not initialized"),
+
+  /** Error occurs during job execution. */
+  CORE_0008("Error occurs during job execution"),
+
+  /** The system was load to instantiate the specified class. */
+  CORE_0009("Unable to load the specified class"),
+
+  /** The system was unable to instantiate the specified class. */
+  CORE_0010("Unable to instantiate the specified class"),
+
+  /** The parameter already exists in the context */
+  CORE_0011("The parameter already exists in the context"),
+
+  /** The data type is not supported */
+  CORE_0012("The data type is not supported"),
+
+  /** Cannot write to the data writer */
+  CORE_0013("Cannot write to the data writer"),
+
+  /** Cannot read from the data reader */
+  CORE_0014("Cannot read to the data reader"),
+
+  /** Unable to write data due to interrupt */
+  CORE_0015("Unable to write data due to interrupt"),
+
+  /** Unable to read data due to interrupt */
+  CORE_0016("Unable to read data due to interrupt"),
+
+  /** Error occurs during extractor run */
+  CORE_0017("Error occurs during extractor run"),
+
+  /** Error occurs during loader run */
+  CORE_0018("Error occurs during loader run"),
+
+  /** Data have not been completely consumed yet */
+  CORE_0019("Data have not been completely consumed yet");
 
   private final String message;
 

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/JobConstants.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+
+public final class JobConstants {
+
+  /**
+   * All job related configuration is prefixed with this:
+   * <tt>org.apache.sqoop.job.</tt>
+   */
+  public static final String PREFIX_JOB_CONFIG =
+      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+      + "etl.partitioner";
+
+  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+      + "etl.extractor";
+
+  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+      + "etl.loader";
+
+  public static final String JOB_ETL_FIELD_NAMES = PREFIX_JOB_CONFIG
+      + "etl.field.names";
+
+  public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_JOB_CONFIG
+      + "etl.output.directory";
+
+  private JobConstants() {
+    // Disable explicit object creation
+  }
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * An immutable context used in the ETL framework
+ * for accessing configuration values.
+ */
+public class EtlContext implements Context {
+
+  protected Configuration conf;
+
+  public EtlContext(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String getString(String key) {
+    return conf.get(key);
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return StringUtils.getStrings(getString(JobConstants.JOB_ETL_FIELD_NAMES));
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+
+/**
+ * A mutable context used in the ETL framework.
+ * (for example, configuration initialization)
+ */
+public class EtlMutableContext extends EtlContext implements MutableContext  {
+
+  public EtlMutableContext(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public void setString(String key, String value) {
+    if (conf.get(key) != null) {
+      throw new SqoopException(CoreError.CORE_0011, key);
+    }
+
+    conf.set(key, value);
+  }
+
+  @Override
+  public void setFieldNames(String[] names) {
+    setString(JobConstants.JOB_ETL_FIELD_NAMES, StringUtils.arrayToString(names));
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/Data.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+
+public class Data implements WritableComparable<Data> {
+
+  // The content is an Object to accommodate different kinds of data.
+  // For example, it can be:
+  // - Object[] for an array of object record
+  // - String for a text of CSV record
+  private Object content = null;
+
+  private static final int EMPTY_DATA = 0;
+  private static final int CSV_RECORD = 1;
+  private static final int ARRAY_RECORD = 2;
+  private int type = EMPTY_DATA;
+
+  private static char FIELD_DELIMITER = ',';
+  private static char RECORD_DELIMITER = '\n';
+
+  public void setContent(Object content) {
+    if (content == null) {
+      this.type = EMPTY_DATA;
+    } else if (content instanceof String) {
+      this.type = CSV_RECORD;
+    } else if (content instanceof Object[]) {
+      this.type = ARRAY_RECORD;
+    } else {
+      throw new SqoopException(CoreError.CORE_0012,
+          content.getClass().getName());
+    }
+    this.content = content;
+  }
+
+  public Object getContent() {
+    return content;
+  }
+
+  public int getType() {
+    return type;
+  }
+
+  public boolean isEmpty() {
+    return (type == EMPTY_DATA);
+  }
+
+  @Override
+  public int compareTo(Data other) {
+    byte[] myBytes = toString().getBytes(Charset.forName("UTF-8"));
+    byte[] otherBytes = other.toString().getBytes(Charset.forName("UTF-8"));
+    return WritableComparator.compareBytes(
+        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Data)) {
+      return false;
+    }
+
+    Data data = (Data)other;
+    if (type != data.getType()) {
+      return false;
+    }
+
+    return toString().equals(data.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    switch (type) {
+    case CSV_RECORD:
+      result += 31 * content.hashCode();
+      return result;
+    case ARRAY_RECORD:
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        result += 31 * array[i].hashCode();
+      }
+      return result;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public String toString() {
+    switch (type) {
+    case CSV_RECORD:
+      return (String)content + RECORD_DELIMITER;
+    case ARRAY_RECORD:
+      StringBuilder sb = new StringBuilder();
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        if (i != 0) {
+          sb.append(FIELD_DELIMITER);
+        }
+
+        if (array[i] instanceof String) {
+          sb.append("\'");
+          sb.append(((String)array[i]).replaceAll(
+              "\'", Matcher.quoteReplacement("\\\'")));
+          sb.append("\'");
+        } else if (array[i] instanceof byte[]) {
+          sb.append(Arrays.toString((byte[])array[i]));
+        } else {
+          sb.append(array[i].toString());
+        }
+      }
+      sb.append(RECORD_DELIMITER);
+      return sb.toString();
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    type = readType(in);
+    switch (type) {
+    case CSV_RECORD:
+      readCsv(in);
+      break;
+    case ARRAY_RECORD:
+      readArray(in);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeType(out, type);
+    switch (type) {
+    case CSV_RECORD:
+      writeCsv(out);
+      break;
+    case ARRAY_RECORD:
+      writeArray(out);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private int readType(DataInput in) throws IOException {
+    return WritableUtils.readVInt(in);
+  }
+
+  private void writeType(DataOutput out, int type) throws IOException {
+    WritableUtils.writeVInt(out, type);
+  }
+
+  private void readCsv(DataInput in) throws IOException {
+    content = in.readUTF();
+  }
+
+  private void writeCsv(DataOutput out) throws IOException {
+    out.writeUTF((String)content);
+  }
+
+  private void readArray(DataInput in) throws IOException {
+    // read number of columns
+    int columns = in.readInt();
+    content = new Object[columns];
+    Object[] array = (Object[])content;
+    // read each column
+    for (int i = 0; i < array.length; i++) {
+      int type = readType(in);
+      switch (type) {
+      case FieldTypes.UTF:
+        array[i] = in.readUTF();
+        break;
+
+      case FieldTypes.BIN:
+        int length = in.readInt();
+        byte[] bytes = new byte[length];
+        in.readFully(bytes);
+        array[i] = bytes;
+        break;
+
+      case FieldTypes.DOUBLE:
+        array[i] = in.readDouble();
+        break;
+
+      case FieldTypes.FLOAT:
+        array[i] = in.readFloat();
+        break;
+
+      case FieldTypes.LONG:
+        array[i] = in.readLong();
+        break;
+
+      case FieldTypes.INT:
+        array[i] = in.readInt();
+        break;
+
+      case FieldTypes.SHORT:
+        array[i] = in.readShort();
+        break;
+
+      case FieldTypes.CHAR:
+        array[i] = in.readChar();
+        break;
+
+      case FieldTypes.BYTE:
+        array[i] = in.readByte();
+        break;
+
+      case FieldTypes.BOOLEAN:
+        array[i] = in.readBoolean();
+        break;
+
+      case FieldTypes.NULL:
+        array[i] = null;
+        break;
+
+      default:
+        throw new IOException(
+          new SqoopException(CoreError.CORE_0012, Integer.toString(type))
+        );
+      }
+    }
+  }
+
+  private void writeArray(DataOutput out) throws IOException {
+    Object[] array = (Object[])content;
+    // write number of columns
+    out.writeInt(array.length);
+    // write each column
+    for (int i = 0; i < array.length; i++) {
+      if (array[i] instanceof String) {
+        writeType(out, FieldTypes.UTF);
+        out.writeUTF((String)array[i]);
+
+      } else if (array[i] instanceof byte[]) {
+        writeType(out, FieldTypes.BIN);
+        out.writeInt(((byte[])array[i]).length);
+        out.write((byte[])array[i]);
+
+      } else if (array[i] instanceof Double) {
+        writeType(out, FieldTypes.DOUBLE);
+        out.writeDouble((Double)array[i]);
+
+      } else if (array[i] instanceof Float) {
+        writeType(out, FieldTypes.FLOAT);
+        out.writeFloat((Float)array[i]);
+
+      } else if (array[i] instanceof Long) {
+        writeType(out, FieldTypes.LONG);
+        out.writeLong((Long)array[i]);
+
+      } else if (array[i] instanceof Integer) {
+        writeType(out, FieldTypes.INT);
+        out.writeInt((Integer)array[i]);
+
+      } else if (array[i] instanceof Short) {
+        writeType(out, FieldTypes.SHORT);
+        out.writeShort((Short)array[i]);
+
+      } else if (array[i] instanceof Character) {
+        writeType(out, FieldTypes.CHAR);
+        out.writeChar((Character)array[i]);
+
+      } else if (array[i] instanceof Byte) {
+        writeType(out, FieldTypes.BYTE);
+        out.writeByte((Byte)array[i]);
+
+      } else if (array[i] instanceof Boolean) {
+        writeType(out, FieldTypes.BOOLEAN);
+        out.writeBoolean((Boolean)array[i]);
+
+      } else if (array[i] == null) {
+        writeType(out, FieldTypes.NULL);
+
+      } else {
+        throw new IOException(
+          new SqoopException(
+              CoreError.CORE_0012, array[i].getClass().getName()
+          )
+        );
+      }
+    }
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+public final class FieldTypes {
+
+  public static final int NULL    = 0;
+
+  public static final int BOOLEAN = 1;
+
+  public static final int BYTE    = 10;
+  public static final int CHAR    = 11;
+
+  public static final int SHORT   = 20;
+  public static final int INT     = 21;
+  public static final int LONG    = 22;
+
+  public static final int FLOAT   = 50;
+  public static final int DOUBLE  = 51;
+
+  public static final int BIN     = 100;
+  public static final int UTF     = 101;
+
+  private FieldTypes() {
+    // Disable explicit object creation
+  }
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopFileOutputFormat
+    extends FileOutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopFileOutputFormat.class.getName());
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) {
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * An InputFormat for MapReduce job.
+ */
+public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopInputFormat.class.getName());
+
+  @Override
+  public RecordReader<SqoopSplit, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) {
+    return new SqoopRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+    Class<?> clz = ClassLoadingUtils.loadClass(partitionerName);
+    if (clz == null) {
+      throw new SqoopException(CoreError.CORE_0009, partitionerName);
+    }
+
+    Partitioner partitioner;
+    try {
+      partitioner = (Partitioner) clz.newInstance();
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0010, partitionerName, e);
+    }
+
+    List<Partition> partitions = partitioner.run(new EtlContext(conf));
+    List<InputSplit> splits = new LinkedList<InputSplit>();
+    for (Partition partition : partitions) {
+      SqoopSplit split = new SqoopSplit();
+      split.setPartition(partition);
+      splits.add(split);
+    }
+
+    return splits;
+  }
+
+  public static class SqoopRecordReader
+      extends RecordReader<SqoopSplit, NullWritable> {
+
+    private boolean delivered = false;
+    private SqoopSplit split = null;
+
+    @Override
+    public boolean nextKeyValue() {
+      if (delivered) {
+        return false;
+      } else {
+        delivered = true;
+        return true;
+      }
+    }
+
+    @Override
+    public SqoopSplit getCurrentKey() {
+      return split;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public float getProgress() {
+      return delivered ? 1.0f : 0.0f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      this.split = (SqoopSplit)split;
+    }
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * A mapper to perform map function.
+ */
+public class SqoopMapper
+    extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopMapper.class.getName());
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+    Class<?> clz = ClassLoadingUtils.loadClass(extractorName);
+    if (clz == null) {
+      throw new SqoopException(CoreError.CORE_0009, extractorName);
+    }
+
+    Extractor extractor;
+    try {
+      extractor = (Extractor) clz.newInstance();
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0010, extractorName, e);
+    }
+
+    SqoopSplit split = context.getCurrentKey();
+
+    try {
+      extractor.run(new EtlContext(conf), split.getPartition(),
+          new MapDataWriter(context));
+
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0017, e);
+    }
+  }
+
+  public class MapDataWriter extends DataWriter {
+    private Context context;
+    private Data data;
+
+    public MapDataWriter(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public void writeArrayRecord(Object[] record) {
+      writeContent(record);
+    }
+
+    @Override
+    public void writeCsvRecord(String csv) {
+      writeContent(csv);
+    }
+
+    private void writeContent(Object content) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setContent(content);
+      try {
+        context.write(data, NullWritable.get());
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0013, e);
+      }
+    }
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopNullOutputFormat.class.getName());
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {
+    // do nothing
+  }
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) {
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    // return an output committer that does nothing
+    return new NullOutputCommitter();
+  }
+
+  class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) { }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.EtlContext;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+public class SqoopOutputFormatLoadExecutor {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
+
+  private boolean readerFinished;
+  private boolean writerFinished;
+  private Data data;
+  private JobContext context;
+  private SqoopRecordWriter producer;
+  private ConsumerThread consumer;
+
+  public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
+    data = new Data();
+    context = jobctx;
+    producer = new SqoopRecordWriter();
+    consumer = new ConsumerThread();
+  }
+
+  public RecordWriter<Data, NullWritable> getRecordWriter() {
+    consumer.setDaemon(true);
+    consumer.start();
+    return producer;
+  }
+
+  public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+    @Override
+    public void write(Data key, NullWritable value) {
+      synchronized (data) {
+        if (readerFinished) {
+          consumer.checkException();
+          return;
+        }
+
+        try {
+          if (!data.isEmpty()) {
+            // wait for reader to consume data
+            data.wait();
+          }
+
+          data.setContent(key.getContent());
+
+          // notify reader that the data is ready
+          data.notify();
+
+        } catch (InterruptedException e) {
+          // inform reader that writer is finished
+          writerFinished = true;
+
+          // unlock reader so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0015, e);
+        }
+      }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) {
+      synchronized (data) {
+        if (readerFinished) {
+          consumer.checkException();
+          return;
+        }
+
+        try {
+          if (!data.isEmpty()) {
+            // wait for reader to consume data
+            data.wait();
+          }
+
+          writerFinished = true;
+
+          data.notify();
+
+        } catch (InterruptedException e) {
+          // inform reader that writer is finished
+          writerFinished = true;
+
+          // unlock reader so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0015, e);
+        }
+      }
+    }
+  }
+
+  public class OutputFormatDataReader extends DataReader {
+    @Override
+    public Object[] readArrayRecord() {
+      return (Object[])readContent();
+    }
+
+    @Override
+    public String readCsvRecord() {
+      return (String)readContent();
+    }
+
+    private Object readContent() {
+      synchronized (data) {
+        if (writerFinished) {
+          return null;
+        }
+
+        try {
+          if (data.isEmpty()) {
+            // wait for writer to produce data
+            data.wait();
+          }
+
+          Object content = data.getContent();
+          data.setContent(null);
+
+          // notify writer that data is consumed
+          data.notify();
+
+          return content;
+
+        } catch (InterruptedException e) {
+          // inform writer that reader is finished
+          readerFinished = true;
+
+          // unlock writer so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0016, e);
+        }
+      }
+    }
+  }
+
+  public class ConsumerThread extends Thread {
+    private SqoopException exception = null;
+
+    public void checkException() {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+
+    @Override
+    public void run() {
+      DataReader reader = new OutputFormatDataReader();
+
+      Configuration conf = context.getConfiguration();
+
+      try {
+        String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
+        Class<?> clz = ClassLoadingUtils.loadClass(loaderName);
+        if (clz == null) {
+          throw new SqoopException(CoreError.CORE_0009, loaderName);
+        }
+
+        Loader loader;
+        try {
+          loader = (Loader) clz.newInstance();
+        } catch (Exception e) {
+          throw new SqoopException(CoreError.CORE_0010, loaderName, e);
+        }
+
+        try {
+          loader.run(new EtlContext(conf), reader);
+
+        } catch (Throwable t) {
+          throw new SqoopException(CoreError.CORE_0018, t);
+        }
+
+      } catch (SqoopException e) {
+        exception = e;
+      }
+
+      synchronized (data) {
+        // inform writer that reader is finished
+        readerFinished = true;
+
+        // unlock writer so it can continue
+        data.notify();
+
+        // if no exception happens yet
+        if (exception == null && !writerFinished) {
+          // create exception if data are not all consumed
+          exception = new SqoopException(CoreError.CORE_0019);
+        }
+
+        // throw deferred exception if exist
+        if (exception != null) {
+          throw exception;
+        }
+      }
+    }
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java (added)
+++ sqoop/branches/sqoop2/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.utils.ClassLoadingUtils;
+
+/**
+ * An input split to be read.
+ */
+public class SqoopSplit extends InputSplit implements Writable {
+
+  private Partition partition;
+
+  public void setPartition(Partition partition) {
+    this.partition = partition;
+  }
+
+  public Partition getPartition() {
+    return partition;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return new String[] {};
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // read Partition class name
+    String className = in.readUTF();
+    // instantiate Partition object
+    Class<?> clz = ClassLoadingUtils.loadClass(className);
+    if (clz == null) {
+      throw new SqoopException(CoreError.CORE_0009, className);
+    }
+    try {
+      partition = (Partition) clz.newInstance();
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0010, className, e);
+    }
+    // read Partition object content
+    partition.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write Partition class name
+    out.writeUTF(partition.getClass().getName());
+    // write Partition object content
+    partition.write(out);
+  }
+
+}

Added: sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java?rev=1379307&view=auto
==============================================================================
--- sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java (added)
+++ sqoop/branches/sqoop2/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java Fri Aug 31 06:24:05 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.etl.Context;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import org.apache.sqoop.job.mr.SqoopSplit;
+import org.junit.Test;
+
+public class TestMapReduce {
+
+  private static final int START_ID = 1;
+  private static final int NUMBER_OF_IDS = 9;
+  private static final int NUMBER_OF_ROWS_PER_ID = 10;
+
+  @Test
+  public void testInputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    Job job = Job.getInstance(conf);
+
+    SqoopInputFormat inputformat = new SqoopInputFormat();
+    List<InputSplit> splits = inputformat.getSplits(job);
+    Assert.assertEquals(9, splits.size());
+
+    for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+      SqoopSplit split = (SqoopSplit)splits.get(id-1);
+      DummyPartition partition = (DummyPartition)split.getPartition();
+      Assert.assertEquals(id, partition.getId());
+    }
+  }
+
+  @Test
+  public void testMapper() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+
+    Job job = Job.getInstance(conf);
+    job.setInputFormatClass(SqoopInputFormat.class);
+    job.setMapperClass(SqoopMapper.class);
+    job.setMapOutputKeyClass(Data.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(DummyOutputFormat.class);
+    job.setOutputKeyClass(Data.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    boolean success = job.waitForCompletion(true);
+    Assert.assertEquals("Job failed!", true, success);
+  }
+
+  @Test
+  public void testOutputFormat() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+
+    Job job = Job.getInstance(conf);
+    job.setInputFormatClass(SqoopInputFormat.class);
+    job.setMapperClass(SqoopMapper.class);
+    job.setMapOutputKeyClass(Data.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(SqoopNullOutputFormat.class);
+    job.setOutputKeyClass(Data.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    boolean success = job.waitForCompletion(true);
+    Assert.assertEquals("Job failed!", true, success);
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> run(Context context) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void run(Context context, Partition partition, DataWriter writer) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
+        Object[] array = new Object[] {
+          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row),
+          new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
+          new Double(id*NUMBER_OF_ROWS_PER_ID+row)
+        };
+        writer.writeArrayRecord(array);
+      }
+    }
+  }
+
+  public static class DummyOutputFormat
+      extends OutputFormat<Data, NullWritable> {
+    @Override
+    public void checkOutputSpecs(JobContext context) {
+      // do nothing
+    }
+
+    @Override
+    public RecordWriter<Data, NullWritable> getRecordWriter(
+        TaskAttemptContext context) {
+      return new DummyRecordWriter();
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+      return new DummyOutputCommitter();
+    }
+
+    public static class DummyRecordWriter
+        extends RecordWriter<Data, NullWritable> {
+      private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+      private Data data = new Data();
+
+      @Override
+      public void write(Data key, NullWritable value) {
+        Object[] record = new Object[] {
+          String.valueOf(index),
+          new Integer(index),
+          new Double(index)
+        };
+        data.setContent(record);
+        Assert.assertEquals(data.toString(), key.toString());
+        index++;
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {
+        // do nothing
+      }
+    }
+
+    public static class DummyOutputCommitter extends OutputCommitter {
+      @Override
+      public void setupJob(JobContext jobContext) { }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+        return false;
+      }
+    }
+  }
+
+  public static class DummyLoader extends Loader {
+    private int index = START_ID*NUMBER_OF_ROWS_PER_ID;
+    private Data expected = new Data();
+    private Data actual = new Data();
+
+    @Override
+    public void run(Context context, DataReader reader) {
+      Object[] array;
+      while ((array = reader.readArrayRecord()) != null) {
+        actual.setContent(array);
+
+        expected.setContent(new Object[] {
+          String.valueOf(index),
+          new Integer(index),
+          new Double(index)});
+        index++;
+
+        Assert.assertEquals(expected.toString(), actual.toString());
+      };
+    }
+  }
+
+}

Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Context.java Fri Aug 31 06:24:05 2012
@@ -24,4 +24,6 @@ public interface Context {
 
   public String getString(String key);
 
+  public String[] getFieldNames();
+
 }

Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java Fri Aug 31 06:24:05 2012
@@ -19,20 +19,21 @@ package org.apache.sqoop.job.etl;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
 
 /**
  * A part of the input data partitioned by the Partitioner.
  */
-public interface Partition {
+public abstract class Partition {
 
   /**
    * Deserialize the fields of this partition from input.
    */
-  public void readFields(DataInput in);
+  public abstract void readFields(DataInput in) throws IOException;
 
   /**
    * Serialize the fields of this partition to output.
    */
-  public void write(DataOutput out);
+  public abstract void write(DataOutput out) throws IOException;
 
 }

Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java Fri Aug 31 06:24:05 2012
@@ -21,6 +21,10 @@ package org.apache.sqoop.job.io;
  * An intermediate layer for passing data from the MR framework
  * to the ETL framework.
  */
-public interface DataReader {
+public abstract class DataReader {
+
+  public abstract Object[] readArrayRecord();
+
+  public abstract String readCsvRecord();
 
 }

Modified: sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java
URL: http://svn.apache.org/viewvc/sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java?rev=1379307&r1=1379306&r2=1379307&view=diff
==============================================================================
--- sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java (original)
+++ sqoop/branches/sqoop2/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java Fri Aug 31 06:24:05 2012
@@ -21,6 +21,10 @@ package org.apache.sqoop.job.io;
  * An intermediate layer for passing data from the ETL framework
  * to the MR framework.
  */
-public interface DataWriter {
+public abstract class DataWriter {
+
+  public abstract void writeArrayRecord(Object[] array);
+
+  public abstract void writeCsvRecord(String csv);
 
 }