You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/03 01:56:10 UTC

git commit: SQOOP-1560: Sqoop2: Move matcher out of Schema

Repository: sqoop
Updated Branches:
  refs/heads/SQOOP-1367 48ce535ba -> 095791bfc


SQOOP-1560: Sqoop2: Move matcher out of Schema

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/095791bf
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/095791bf
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/095791bf

Branch: refs/heads/SQOOP-1367
Commit: 095791bfc0b74db3bed169735825c24cd54ebc00
Parents: 48ce535
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Oct 2 16:55:29 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Oct 2 16:55:29 2014 -0700

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          | 134 ++++-----
 .../connector/idf/IntermediateDataFormat.java   |   9 +-
 .../connector/idf/matcher/AbstractMatcher.java  |  62 -----
 .../connector/idf/matcher/LocationMatcher.java  |  82 ------
 .../connector/idf/matcher/NameMatcher.java      |  69 -----
 .../connector/matcher/LocationMatcher.java      |  78 ++++++
 .../apache/sqoop/connector/matcher/Matcher.java |  69 +++++
 .../sqoop/connector/matcher/MatcherError.java   |  41 +++
 .../sqoop/connector/matcher/MatcherFactory.java |  30 ++
 .../sqoop/connector/matcher/NameMatcher.java    |  74 +++++
 .../idf/TestCSVIntermediateDataFormat.java      | 160 ++---------
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  43 +--
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  16 +-
 .../java/org/apache/sqoop/job/JobUtils.java     |  12 +-
 .../org/apache/sqoop/job/TestMapReduce.java     |  12 +-
 .../java/org/apache/sqoop/job/TestMatching.java | 275 +++++++++++++++++++
 16 files changed, 681 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 2a49221..02d1a51 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -22,12 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.matcher.AbstractMatcher;
-import org.apache.sqoop.connector.idf.matcher.LocationMatcher;
-import org.apache.sqoop.connector.idf.matcher.NameMatcher;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.SchemaMatchOption;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
@@ -41,7 +36,6 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 import java.util.regex.Matcher;
 
 public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@@ -71,8 +65,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
   private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
 
-  private Schema fromSchema;
-  private Schema toSchema;
+  private Schema schema;
 
   /**
    * {@inheritDoc}
@@ -94,11 +87,11 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    * {@inheritDoc}
    */
   @Override
-  public void setFromSchema(Schema schema) {
+  public void setSchema(Schema schema) {
     if(schema == null) {
       return;
     }
-    this.fromSchema = schema;
+    this.schema = schema;
     List<Column> columns = schema.getColumns();
     int i = 0;
     for(Column col : columns) {
@@ -112,19 +105,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   }
 
   /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void setToSchema(Schema schema) {
-    if(schema == null) {
-      return;
-    }
-    this.toSchema = schema;
-  }
-
-
-
-  /**
    * Custom CSV parser that honors quoting and escaped quotes.
    * All other escaping is handled elsewhere.
    *
@@ -180,69 +160,68 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   /**
    * {@inheritDoc}
-   *
-   * The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema.
-   * We currently have 3 methods of matching fields in one schema to another:
-   * - by location
-   * - by name
-   * - user-defined matching
-   *
-   * If one schema exists (either to or from) and the other is empty
-   * We'll match fields based on location.
-   * If both schemas exist, we'll match names of fields.
-   *
-   * In the future, we may want to let users choose the method
-   * Currently nothing is implemented for user-defined matching
    */
   @Override
   public Object[] getObjectData() {
+    if (schema.isEmpty()) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+    }
+
     String[] fields = getFields();
+
     if (fields == null) {
       return null;
     }
 
-    if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+    if (fields.length != schema.getColumns().size()) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+          "The data " + getTextData() + " has the wrong number of fields.");
     }
 
-    AbstractMatcher matcher = getMatcher(fromSchema,toSchema);
-    String[] outFields =  matcher.getMatchingData(fields, fromSchema, toSchema);
-    Object[] out =  new Object[outFields.length];
-
-    int i = 0;
-
-    // After getting back the data in order that matches the output schema
-    // We need to un-do the CSV escaping
-    for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) {
-      Type colType = col.getType();
-      if (outFields[i] == null) {
+    Object[] out = new Object[fields.length];
+    Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
+    for (int i = 0; i < fields.length; i++) {
+      Type colType = cols[i].getType();
+      if (fields[i].equals("NULL")) {
         out[i] = null;
         continue;
       }
-      if (colType == Type.TEXT) {
-        out[i] = unescapeStrings(outFields[i]);
-      } else if (colType == Type.BINARY) {
-        out[i] = unescapeByteArray(outFields[i]);
-      } else if (colType == Type.FIXED_POINT) {
-        Long byteSize = ((FixedPoint) col).getByteSize();
-        if (byteSize != null && byteSize <= Integer.SIZE) {
-          out[i] = Integer.valueOf(outFields[i]);
-        } else {
-          out[i] = Long.valueOf(outFields[i]);
-        }
-      } else if (colType == Type.FLOATING_POINT) {
-        Long byteSize = ((FloatingPoint) col).getByteSize();
-        if (byteSize != null && byteSize <= Float.SIZE) {
-          out[i] = Float.valueOf(outFields[i]);
-        } else {
-          out[i] = Double.valueOf(outFields[i]);
-        }
-      } else if (colType == Type.DECIMAL) {
-        out[i] = new BigDecimal(outFields[i]);
-      } else {
-        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
+
+      Long byteSize;
+      switch(colType) {
+        case TEXT:
+          out[i] = unescapeStrings(fields[i]);
+          break;
+        case BINARY:
+          out[i] = unescapeByteArray(fields[i]);
+          break;
+        case FIXED_POINT:
+          byteSize = ((FixedPoint) cols[i]).getByteSize();
+          if (byteSize != null && byteSize <= Integer.SIZE) {
+            out[i] = Integer.valueOf(fields[i]);
+          } else {
+            out[i] = Long.valueOf(fields[i]);
+          }
+          break;
+        case FLOATING_POINT:
+          byteSize = ((FloatingPoint) cols[i]).getByteSize();
+          if (byteSize != null && byteSize <= Float.SIZE) {
+            out[i] = Float.valueOf(fields[i]);
+          } else {
+            out[i] = Double.valueOf(fields[i]);
+          }
+          break;
+        case DECIMAL:
+          out[i] = new BigDecimal(fields[i]);
+          break;
+        case DATE:
+        case DATE_TIME:
+        case BIT:
+          out[i] = fields[i];
+          break;
+        default:
+          throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
       }
-      i++;
     }
     return out;
   }
@@ -380,15 +359,4 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   public String toString() {
     return data;
   }
-
-  private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) {
-    if (toSchema.isEmpty() || fromSchema.isEmpty()) {
-      return new LocationMatcher();
-    } else {
-      return new NameMatcher();
-    }
-
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index d98b779..5ef6fc6 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -111,14 +111,7 @@ public abstract class IntermediateDataFormat<T> {
    *
    * @param schema - the schema used for reading data
    */
-  public abstract void setFromSchema(Schema schema);
-
-  /**
-   * Set the schema for writing data.
-   *
-   * @param schema - the schema used for writing data
-   */
-  public abstract void setToSchema(Schema schema);
+  public abstract void setSchema(Schema schema);
 
   /**
    * Serialize the fields of this object to <code>out</code>.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
deleted file mode 100644
index e6b2316..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-
-public abstract class AbstractMatcher {
-
-  //NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats
-  //NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS.
-
-  /**
-   *
-   * @param fields
-   * @param fromSchema
-   * @param toSchema
-   * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
-   * Right not "converted" means re-ordering if needed and handling nulls.
-   */
-  abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema);
-
-  /***
-   *
-   * @param fromSchema
-   * @param toSchema
-   * @return return a schema with which to read the output data
-   * This always returns the toSchema (since this is used when getting output data), unless its empty
-   */
-  public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) {
-    if (toSchema.isEmpty()) {
-      return fromSchema;
-    } else {
-      return toSchema;
-    }
-
-  }
-
-  protected boolean isNull(String value) {
-    if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) {
-      return true;
-    }
-    return false;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
deleted file mode 100644
index 938a5df..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.SchemaMatchOption;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
-import org.apache.sqoop.schema.type.Type;
-
-import java.math.BigDecimal;
-import java.util.Iterator;
-
-
-/**
- * Convert data according to FROM schema to data according to TO schema
- * This is done based on column location
- * So data in first column in FROM goes into first column in TO, etc
- * If TO schema has more fields and they are "nullable", the value will be set to null
- * If TO schema has extra non-null fields, we'll throw an exception
- */
-public class LocationMatcher extends AbstractMatcher {
-
-  public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
-  @Override
-  public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
-
-    String[] out = new String[toSchema.getColumns().size()];
-
-    int i = 0;
-
-    if (toSchema.isEmpty()) {
-      // If there's no destination schema, no need to convert anything
-      // Just use the original data
-      return fields;
-    }
-
-    for (Column col: toSchema.getColumns())
-    {
-      if (i < fields.length) {
-        if (isNull(fields[i])) {
-          out[i] = null;
-        } else {
-          out[i] = fields[i];
-        }
-      }
-      // We ran out of fields before we ran out of schema
-      else {
-        if (!col.getNullable()) {
-          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
-        } else {
-          LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
-          out[i] = null;
-        }
-      }
-      i++;
-    }
-    return out;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
deleted file mode 100644
index 417c85b..0000000
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.idf.matcher;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaError;
-import org.apache.sqoop.schema.type.Column;
-
-import java.util.HashMap;
-
-public class NameMatcher extends AbstractMatcher {
-  public static final Logger LOG = Logger.getLogger(NameMatcher.class);
-
-  @Override
-  public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
-    String[] out = new String[toSchema.getColumns().size()];
-
-    HashMap<String,Column> colNames = new HashMap<String, Column>();
-
-    for (Column fromCol: fromSchema.getColumns()) {
-      colNames.put(fromCol.getName(), fromCol);
-    }
-
-    int toIndex = 0;
-
-    for (Column toCol: toSchema.getColumns()) {
-      Column fromCol = colNames.get(toCol.getName());
-
-      if (fromCol != null) {
-        int fromIndex = fromSchema.getColumns().indexOf(fromCol);
-        if (isNull(fields[fromIndex])) {
-          out[toIndex] = null;
-        } else {
-          out[toIndex] = fields[fromIndex];
-        }
-      } else {
-        //column exists in TO schema but not in FROM schema
-        if (toCol.getNullable() == false) {
-          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
-        } else {
-          LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
-          out[toIndex] = null;
-        }
-      }
-
-      toIndex++;
-    }
-
-  return out;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
new file mode 100644
index 0000000..58b709e
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
+
+
+/**
+ * Convert data according to FROM schema to data according to TO schema
+ * This is done based on column location
+ * So data in first column in FROM goes into first column in TO, etc
+ * If TO schema has more fields and they are "nullable", the value will be set to null
+ * If TO schema has extra non-null fields, we'll throw an exception
+ */
+public class LocationMatcher extends Matcher {
+
+  public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
+
+  public LocationMatcher(Schema from, Schema to) {
+    super(from, to);
+  }
+
+  @Override
+  public Object[] getMatchingData(Object[] fields) {
+
+    Object[] out = new Object[getToSchema().getColumns().size()];
+
+    int i = 0;
+
+    if (getToSchema().isEmpty()) {
+      // If there's no destination schema, no need to convert anything
+      // Just use the original data
+      return fields;
+    }
+
+    for (Column col: getToSchema().getColumns()) {
+      if (i < fields.length) {
+        if (isNull(fields[i])) {
+          out[i] = null;
+        } else {
+          out[i] = fields[i];
+        }
+      }
+      // We ran out of fields before we ran out of schema
+      else {
+        if (!col.getNullable()) {
+          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
+        } else {
+          LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
+          out[i] = null;
+        }
+      }
+      i++;
+    }
+    return out;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
new file mode 100644
index 0000000..8ab1318
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+
+public abstract class Matcher {
+
+  private final Schema fromSchema;
+  private final Schema toSchema;
+
+  public Matcher(Schema fromSchema, Schema toSchema) {
+    if (fromSchema.isEmpty() && toSchema.isEmpty()) {
+      throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided.");
+    } else if (toSchema.isEmpty()) {
+      this.fromSchema = fromSchema;
+      this.toSchema = fromSchema;
+    } else if (fromSchema.isEmpty()) {
+      this.fromSchema = toSchema;
+      this.toSchema = toSchema;
+    } else {
+      this.fromSchema = fromSchema;
+      this.toSchema = toSchema;
+    }
+  }
+
+  /**
+   *
+   * @param fields
+   * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
+   * Right not "converted" means re-ordering if needed and handling nulls.
+   */
+  abstract public Object[] getMatchingData(Object[] fields);
+
+  public Schema getFromSchema() {
+    return fromSchema;
+  }
+
+  public Schema getToSchema() {
+    return toSchema;
+  }
+
+  protected boolean isNull(Object value) {
+    if (value == null || value.equals("NULL")
+        || value.equals("null") || value.equals("'null'")
+        || value.equals("")) {
+      return true;
+    }
+    return false;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
new file mode 100644
index 0000000..577b091
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum MatcherError implements ErrorCode {
+  MATCHER_0000("To few Schemas provided."),
+
+  ;
+
+  private final String message;
+
+  private MatcherError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
new file mode 100644
index 0000000..ae89e6c
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.sqoop.schema.Schema;
+
+public class MatcherFactory {
+  public static Matcher getMatcher(Schema fromSchema, Schema toSchema) {
+    if (toSchema.isEmpty() || fromSchema.isEmpty()) {
+      return new LocationMatcher(fromSchema, toSchema);
+    } else {
+      return new NameMatcher(fromSchema, toSchema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
new file mode 100644
index 0000000..69d5ebd
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
+
+import java.util.HashMap;
+
+public class NameMatcher extends Matcher {
+
+  public static final Logger LOG = Logger.getLogger(NameMatcher.class);
+
+  public NameMatcher(Schema from, Schema to) {
+    super(from, to);
+  }
+
+  @Override
+  public Object[] getMatchingData(Object[] fields) {
+    Object[] out = new Object[getToSchema().getColumns().size()];
+
+    HashMap<String,Column> colNames = new HashMap<String, Column>();
+
+    for (Column fromCol: getFromSchema().getColumns()) {
+      colNames.put(fromCol.getName(), fromCol);
+    }
+
+    int toIndex = 0;
+
+    for (Column toCol: getToSchema().getColumns()) {
+      Column fromCol = colNames.get(toCol.getName());
+
+      if (fromCol != null) {
+        int fromIndex = getFromSchema().getColumns().indexOf(fromCol);
+        if (isNull(fields[fromIndex])) {
+          out[toIndex] = null;
+        } else {
+          out[toIndex] = fields[fromIndex];
+        }
+      } else {
+        //column exists in TO schema but not in FROM schema
+        if (toCol.getNullable() == false) {
+          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
+        } else {
+          LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
+          out[toIndex] = null;
+        }
+      }
+
+      toIndex++;
+    }
+
+  return out;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 3954039..765bedd 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -20,7 +20,6 @@ package org.apache.sqoop.connector.idf;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.SchemaMatchOption;
 import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.Text;
@@ -40,8 +39,6 @@ public class TestCSVIntermediateDataFormat {
 
   private IntermediateDataFormat<?> data;
 
-  private Schema emptySchema = new Schema("empty");
-
   @Before
   public void setUp() {
     data = new CSVIntermediateDataFormat();
@@ -73,7 +70,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setFromSchema(schema);
+    data.setSchema(schema);
     data.setTextData(null);
 
     Object[] out = data.getObjectData();
@@ -90,7 +87,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setFromSchema(schema);
+    data.setSchema(schema);
     data.setTextData("");
 
     data.getObjectData();
@@ -110,8 +107,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
 
-    data.setFromSchema(schema);
-    data.setToSchema(emptySchema);
+    data.setSchema(schema);
     data.setTextData(testData);
 
     Object[] out = data.getObjectData();
@@ -120,7 +116,7 @@ public class TestCSVIntermediateDataFormat {
     assertEquals(new Long(34),out[1]);
     assertEquals("54",out[2]);
     assertEquals("random data",out[3]);
-    assertEquals(-112, ((byte[])out[4])[0]);
+    assertEquals(-112, ((byte[]) out[4])[0]);
     assertEquals(54, ((byte[])out[4])[1]);
     assertEquals("\n", out[5].toString());
   }
@@ -134,7 +130,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setFromSchema(schema);
+    data.setSchema(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[6];
@@ -164,8 +160,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setFromSchema(schema);
-    data.setToSchema(emptySchema);
+    data.setSchema(schema);
 
     Object[] in = new Object[6];
     in[0] = new Long(10);
@@ -188,8 +183,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new Text("1"));
 
-    data.setFromSchema(schema);
-    data.setToSchema(emptySchema);
+    data.setSchema(schema);
 
     char[] allCharArr = new char[256];
     for(int i = 0; i < allCharArr.length; ++i) {
@@ -212,148 +206,30 @@ public class TestCSVIntermediateDataFormat {
   public void testByteArrayFullRangeOfCharacters() {
     Schema schema = new Schema("test");
     schema.addColumn(new Binary("1"));
-    data.setFromSchema(schema);
-    data.setToSchema(emptySchema);
+    data.setSchema(schema);
 
     byte[] allCharByteArr = new byte[256];
-    for(int i = 0; i < allCharByteArr.length; ++i) {
-      allCharByteArr[i] = (byte)i;
+    for (int i = 0; i < allCharByteArr.length; ++i) {
+      allCharByteArr[i] = (byte) i;
     }
 
     Object[] in = {allCharByteArr};
     Object[] inCopy = new Object[1];
-    System.arraycopy(in,0,inCopy,0,in.length);
+    System.arraycopy(in, 0, inCopy, 0, in.length);
 
     // Modifies the input array, so we use the copy to confirm
     data.setObjectData(in);
     assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
   }
 
-  /**
-   * Note that we don't have an EmptyTo matching test
-   * Because most tests above have empty "to" schema
-   */
-  @Test
-  public void testMatchingEmptyFrom() {
-
-    data.setFromSchema(emptySchema);
-
-    Schema toSchema = new Schema("To");
-    toSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"));
-    data.setToSchema(toSchema);
-
-    Object[] in = new Object[2];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-
-    Object[] out = new Object[2];
-    out[0] = new Long(10);
-    out[1] = new Long(34);
-
-    data.setObjectData(in);
-
-    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
-  }
-
   @Test(expected=SqoopException.class)
-  public void testMatchingTwoEmptySchema() {
-    data.setFromSchema(emptySchema);
-    data.setToSchema(emptySchema);
-
-    Object[] in = new Object[2];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-
-    data.setObjectData(in);
-
-    data.getObjectData();
-  }
-
-  @Test
-  public void testMatchingFewerFromColumns(){
-    Schema fromSchema = new Schema("From");
-    fromSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"));
-    data.setFromSchema(fromSchema);
-
-    Schema toSchema = new Schema("To");
-    toSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"))
-            .addColumn(new Text("3"));
-    data.setToSchema(toSchema);
-
-    Object[] in = new Object[2];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-
-    Object[] out = new Object[3];
-    out[0] = new Long(10);
-    out[1] = new Long(34);
-    out[2] = null;
-
-    data.setObjectData(in);
-
-    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
-  }
-
-  @Test
-  public void testMatchingFewerToColumns(){
-    Schema fromSchema = new Schema("From");
-    fromSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"))
-            .addColumn(new FixedPoint("3"));
-    data.setFromSchema(fromSchema);
-
-    Schema toSchema = new Schema("To");
-    toSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"));
-    data.setToSchema(toSchema);
-
-    Object[] in = new Object[3];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-    in[2] = new Long(50);
-
-    Object[] out = new Object[2];
-    out[0] = new Long(10);
-    out[1] = new Long(34);
-
-
-    data.setObjectData(in);
-
-    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
-  }
-
-
-  @Test
-  public void testWithSomeNonMatchingFields(){
-
-    Schema fromSchema = new Schema("From");
-    fromSchema.addColumn(new FixedPoint("1"))
-            .addColumn(new FixedPoint("2"))
-            .addColumn(new FixedPoint("3"));
-    data.setFromSchema(fromSchema);
-
-    Schema toSchema = new Schema("From");
-    toSchema.addColumn(new FixedPoint("2"))
-            .addColumn(new FixedPoint("3"))
-            .addColumn(new FixedPoint("4"));
-    data.setToSchema(toSchema);
-
-    Object[] in = new Object[3];
-    in[0] = new Long(10);
-    in[1] = new Long(34);
-    in[2] = new Long(50);
-
-    Object[] out = new Object[3];
-    out[0] = new Long(34);
-    out[1] = new Long(50);
-    out[2] = null;
-
-    data.setObjectData(in);
+  public void testEmptySchema() {
+    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+        + ",'\\n'";
+    Schema schema = new Schema("Test");
+    data.setSchema(schema);
+    data.setTextData(testData);
 
-    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+    Object[] out = data.getObjectData();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 8c88d52..03d84d4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -29,13 +29,14 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.matcher.Matcher;
+import org.apache.sqoop.connector.matcher.MatcherFactory;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
@@ -54,8 +55,9 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
    * Service for reporting progress to mapreduce.
    */
   private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
-  private IntermediateDataFormat<String> dataFormat = null;
-  private SqoopWritable dataOut = null;
+  private IntermediateDataFormat<String> fromDataFormat = null;
+  private IntermediateDataFormat<String> toDataFormat = null;
+  private Matcher matcher;
 
   @Override
   public void run(Context context) throws IOException, InterruptedException {
@@ -64,19 +66,17 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
-
-
-    Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
-    Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+    matcher = MatcherFactory.getMatcher(
+        ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
+        ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
 
     String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
-    dataFormat = (IntermediateDataFormat<String>) ClassUtils
+    fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
         .instantiate(intermediateDataFormatName);
-
-    dataFormat.setFromSchema(fromSchema);
-    dataFormat.setToSchema(toSchema);
-
-    dataOut = new SqoopWritable();
+    fromDataFormat.setSchema(matcher.getFromSchema());
+    toDataFormat = (IntermediateDataFormat<String>) ClassUtils
+        .instantiate(intermediateDataFormatName);
+    toDataFormat.setSchema(matcher.getToSchema());
 
     // Objects that should be passed to the Executor execution
     PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
@@ -109,36 +109,41 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
 
   private class SqoopMapDataWriter extends DataWriter {
     private Context context;
+    private SqoopWritable writable;
 
     public SqoopMapDataWriter(Context context) {
       this.context = context;
+      this.writable = new SqoopWritable();
     }
 
     @Override
     public void writeArrayRecord(Object[] array) {
-      dataFormat.setObjectData(array);
+      fromDataFormat.setObjectData(array);
       writeContent();
     }
 
     @Override
     public void writeStringRecord(String text) {
-      dataFormat.setTextData(text);
+      fromDataFormat.setTextData(text);
       writeContent();
     }
 
     @Override
     public void writeRecord(Object obj) {
-      dataFormat.setData(obj.toString());
+      fromDataFormat.setData(obj.toString());
       writeContent();
     }
 
     private void writeContent() {
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Extracted data: " + dataFormat.getTextData());
+          LOG.debug("Extracted data: " + fromDataFormat.getTextData());
         }
-        dataOut.setString(dataFormat.getTextData());
-        context.write(dataOut, NullWritable.get());
+
+        toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) );
+
+        writable.setString(toDataFormat.getTextData());
+        context.write(writable, NullWritable.get());
       } catch (Exception e) {
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 941b31d..1ebd3e4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -34,6 +34,8 @@ import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.matcher.Matcher;
+import org.apache.sqoop.connector.matcher.MatcherFactory;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.common.PrefixContext;
@@ -52,6 +54,7 @@ public class SqoopOutputFormatLoadExecutor {
   private volatile boolean readerFinished = false;
   private volatile boolean writerFinished = false;
   private volatile IntermediateDataFormat<String> dataFormat;
+  private Matcher matcher;
   private JobContext context;
   private SqoopRecordWriter writer;
   private Future<?> consumerFuture;
@@ -65,19 +68,18 @@ public class SqoopOutputFormatLoadExecutor {
     this.loaderName = loaderName;
     dataFormat = new CSVIntermediateDataFormat();
     writer = new SqoopRecordWriter();
+    matcher = null;
   }
 
   public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
     context = jobctx;
     writer = new SqoopRecordWriter();
+    matcher = MatcherFactory.getMatcher(
+        ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
+        ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
     dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
-      .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
-
-    Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
-    dataFormat.setFromSchema(fromSchema);
-
-    Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
-    dataFormat.setToSchema(toSchema);
+        .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+    dataFormat.setSchema(matcher.getToSchema());
   }
 
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
index b5435ff..1952cbb 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -36,14 +36,7 @@ import org.apache.sqoop.job.mr.SqoopSplit;
 
 public class JobUtils {
 
-  public static void runJob(Configuration conf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
-        (conf.get(JobConstants.HADOOP_OUTDIR) != null) ?
-        SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
-  }
-
-  public static void runJob(Configuration conf,
+  public static boolean runJob(Configuration conf,
     Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
     Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
     Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
@@ -57,8 +50,7 @@ public class JobUtils {
     job.setOutputKeyClass(SqoopWritable.class);
     job.setOutputValueClass(NullWritable.class);
 
-    boolean success = job.waitForCompletion(true);
-    Assert.assertEquals("Job failed!", true, success);
+    return job.waitForCompletion(true);
   }
 
   private JobUtils() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 5662120..032cc11 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -54,6 +54,7 @@ import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
 import org.apache.sqoop.schema.type.Text;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -96,8 +97,10 @@ public class TestMapReduce {
 
     Job job = new Job(conf);
     ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
-        DummyOutputFormat.class);
+    ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+    boolean success = JobUtils.runJob(job.getConfiguration(),
+        SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
+    Assert.assertEquals("Job failed!", true, success);
   }
 
   @Test
@@ -116,8 +119,11 @@ public class TestMapReduce {
 
     Job job = new Job(conf);
     ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+    ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+    boolean success = JobUtils.runJob(job.getConfiguration(),
+        SqoopInputFormat.class, SqoopMapper.class,
         SqoopNullOutputFormat.class);
+    Assert.assertEquals("Job failed!", true, success);
 
     // Make sure both destroyers get called.
     assertEquals(1, DummyFromDestroyer.count);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/095791bf/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
new file mode 100644
index 0000000..7f9a147
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith(Parameterized.class)
+public class TestMatching {
+  private static final int START_PARTITION = 1;
+  private static final int NUMBER_OF_PARTITIONS = 1;
+  private static final int NUMBER_OF_ROWS_PER_PARTITION = 1;
+
+  private Schema from;
+  private Schema to;
+
+  public TestMatching(Schema from,
+                       Schema to)
+      throws Exception {
+    this.from = from;
+    this.to = to;
+
+    System.out.println("Testing with Schemas\n\tFROM: " + this.from + "\n\tTO: " + this.to);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<Object[]>();
+
+    Schema emptyFrom = new Schema("FROM-EMPTY");
+    Schema emptyTo = new Schema("TO-EMPTY");
+    Schema from1 = new Schema("FROM-1");
+    Schema to1 = new Schema("TO-1");
+    Schema from2 = new Schema("FROM-2");
+    Schema to2 = new Schema("TO-2");
+
+    from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+        .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+    to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+    from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
+    to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
+
+    parameters.add(new Object[]{
+        emptyFrom,
+        emptyTo
+    });
+    parameters.add(new Object[]{
+        from1,
+        emptyTo
+    });
+    parameters.add(new Object[]{
+        emptyTo,
+        to1
+    });
+    parameters.add(new Object[]{
+        from1,
+        to1
+    });
+    parameters.add(new Object[]{
+        from2,
+        to1
+    });
+    parameters.add(new Object[]{
+        from1,
+        to2
+    });
+
+    return parameters;
+  }
+
+  @Test
+  public void testSchemaMatching() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+        CSVIntermediateDataFormat.class.getName());
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
+    ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
+    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+        DummyOutputFormat.class);
+    boolean success = JobUtils.runJob(job.getConfiguration(),
+        SqoopInputFormat.class, SqoopMapper.class,
+        DummyOutputFormat.class);
+    if (from.getName().split("-")[1].equals("EMPTY")) {
+      if (to.getName().split("-")[1].equals("EMPTY")) {
+        Assert.assertEquals("Job succeeded!", false, success);
+      } else {
+        Assert.assertEquals("Job failed!", true, success);
+      }
+    } else {
+      if (to.getName().split("-")[1].equals("EMPTY")) {
+        Assert.assertEquals("Job failed!", true, success);
+      } else if (from.getName().split("-")[1].equals(to.getName().split("-")[1])) {
+        Assert.assertEquals("Job failed!", true, success);
+      } else {
+        Assert.assertEquals("Job succeeded!", false, success);
+      }
+    }
+  }
+
+  public static class DummyPartition extends Partition {
+    private int id;
+
+    public void setId(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(id);
+    }
+
+    @Override
+    public String toString() {
+      return Integer.toString(id);
+    }
+  }
+
+  public static class DummyPartitioner extends Partitioner {
+    @Override
+    public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+        DummyPartition partition = new DummyPartition();
+        partition.setId(id);
+        partitions.add(partition);
+      }
+      return partitions;
+    }
+  }
+
+  public static class DummyExtractor extends Extractor {
+    @Override
+    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
+      int id = ((DummyPartition)partition).getId();
+      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+        context.getDataWriter().writeArrayRecord(new Object[] {
+            id * NUMBER_OF_ROWS_PER_PARTITION + row,
+            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
+            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+      }
+    }
+
+    @Override
+    public long getRowsRead() {
+      return NUMBER_OF_ROWS_PER_PARTITION;
+    }
+  }
+
+  public static class DummyOutputFormat
+      extends OutputFormat<SqoopWritable, NullWritable> {
+    @Override
+    public void checkOutputSpecs(JobContext context) {
+      // do nothing
+    }
+
+    @Override
+    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
+        TaskAttemptContext context) {
+      return new DummyRecordWriter();
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+      return new DummyOutputCommitter();
+    }
+
+    public static class DummyRecordWriter
+        extends RecordWriter<SqoopWritable, NullWritable> {
+      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+      private Data data = new Data();
+
+      @Override
+      public void write(SqoopWritable key, NullWritable value) {
+
+        data.setContent(new Object[] {
+                index,
+                (double) index,
+                String.valueOf(index)},
+            Data.ARRAY_RECORD);
+        index++;
+
+        assertEquals(data.toString(), key.toString());
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {
+        // do nothing
+      }
+    }
+
+    public static class DummyOutputCommitter extends OutputCommitter {
+      @Override
+      public void setupJob(JobContext jobContext) { }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+        return false;
+      }
+    }
+  }
+}