You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:55 UTC

[26/52] [abbrv] git commit: SQOOP-1378: Sqoop2: From/To: Refactor schema

SQOOP-1378: Sqoop2: From/To: Refactor schema

This patch also changes the tools documentation.


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2c20d920
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2c20d920
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2c20d920

Branch: refs/heads/SQOOP-1367
Commit: 2c20d920f4ab31ae97ba57952d17677146069b5c
Parents: d015362
Author: Gwen Shapira <cs...@gmail.com>
Authored: Thu Sep 25 15:39:43 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:59:24 2014 -0700

----------------------------------------------------------------------
 .../apache/sqoop/job/etl/ExtractorContext.java  |  13 +-
 .../java/org/apache/sqoop/json/SchemaBean.java  |   2 +-
 .../org/apache/sqoop/json/SubmissionBean.java   |   7 +-
 .../sqoop/json/util/SchemaSerialization.java    |   2 +-
 .../org/apache/sqoop/model/MSubmission.java     |  15 +-
 .../java/org/apache/sqoop/schema/Schema.java    |  11 +-
 .../org/apache/sqoop/schema/SchemaError.java    |   6 +
 .../apache/sqoop/schema/SchemaMatchOption.java  |  40 +++++
 .../org/apache/sqoop/schema/type/Column.java    |   6 +-
 .../json/util/TestSchemaSerialization.java      |   2 +-
 .../sqoop/connector/jdbc/TestExtractor.java     |   4 +-
 .../sqoop/connector/hdfs/HdfsConnector.java     |   5 +-
 .../connector/hdfs/HdfsConnectorError.java      |   3 +-
 .../sqoop/connector/hdfs/HdfsInitializer.java   |   2 +-
 .../sqoop/connector/hdfs/TestExtractor.java     |   4 +-
 .../idf/CSVIntermediateDataFormat.java          | 100 ++++++++----
 .../connector/idf/IntermediateDataFormat.java   |  16 +-
 .../connector/idf/matcher/AbstractMatcher.java  |  62 ++++++++
 .../connector/idf/matcher/LocationMatcher.java  |  82 ++++++++++
 .../connector/idf/matcher/NameMatcher.java      |  69 +++++++++
 .../idf/TestCSVIntermediateDataFormat.java      | 151 ++++++++++++++++++-
 .../org/apache/sqoop/driver/JobManager.java     |  15 +-
 docs/src/site/sphinx/Tools.rst                  |   3 +-
 .../apache/sqoop/job/mr/ConfigurationUtils.java |  53 ++++---
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  18 +--
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  15 +-
 .../sqoop/shell/utils/SubmissionDisplayer.java  |   4 +-
 .../main/resources/shell-resource.properties    |   5 +-
 .../mapreduce/MapreduceSubmissionEngine.java    |   7 +-
 29 files changed, 584 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index fd73890..3272b56 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -30,12 +30,10 @@ public class ExtractorContext extends TransferableContext {
 
   private DataWriter writer;
 
-  private Schema schema;
 
-  public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) {
+  public ExtractorContext(ImmutableContext context, DataWriter writer) {
     super(context);
     this.writer = writer;
-    this.schema = schema;
   }
 
   /**
@@ -47,12 +45,5 @@ public class ExtractorContext extends TransferableContext {
     return writer;
   }
 
-  /**
-   * Return schema associated with this step.
-   *
-   * @return
-   */
-  public Schema getSchema() {
-    return schema;
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/json/SchemaBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SchemaBean.java b/common/src/main/java/org/apache/sqoop/json/SchemaBean.java
index 468f7ee..f51fec8 100644
--- a/common/src/main/java/org/apache/sqoop/json/SchemaBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SchemaBean.java
@@ -48,7 +48,7 @@ public class SchemaBean implements JsonBean {
 
   @Override
   public void restore(JSONObject jsonObject) {
-    schema = SchemaSerialization.restoreSchemna(jsonObject);
+    schema = SchemaSerialization.restoreSchema(jsonObject);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
index 9b1ae74..4b80338 100644
--- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
-import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchemna;
+import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
 
 /**
  *
@@ -188,11 +188,12 @@ public class SubmissionBean implements JsonBean {
       if(object.containsKey(COUNTERS)) {
         submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
       }
+
       if(object.containsKey(FROM_SCHEMA)) {
-        submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA)));
+        submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA)));
       }
       if(object.containsKey(TO_SCHEMA)) {
-        submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA)));
+        submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA)));
       }
 
       this.submissions.add(submission);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
index f6a9bbf..1e6da6d 100644
--- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
+++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java
@@ -79,7 +79,7 @@ public class SchemaSerialization {
     return object;
   }
 
-  public static Schema restoreSchemna(JSONObject jsonObject) {
+  public static Schema restoreSchema(JSONObject jsonObject) {
     String name = (String)jsonObject.get(NAME);
     String note = (String)jsonObject.get(NOTE);
     java.util.Date date = new java.util.Date((Long)jsonObject.get(CREATION_DATE));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/model/MSubmission.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
index ca21135..7290df5 100644
--- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java
+++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java
@@ -102,17 +102,14 @@ public class MSubmission extends MAccountableEntity {
   /**
    * Schema for the FROM part of the job submission
    *
-   * This property is required.
+   * This property is required, but can be empty.
    */
   Schema fromSchema;
 
   /**
    * Schema for the TO part of the job submission
-   * Optional schema that reported by the underlying I/O implementation. Please
-   * note that this property might be empty and in such case use the FROM schema
-   * on the TO side.
    *
-   * This property is optional.
+   * This property is required, but can be empty.
    */
   Schema toSchema;
 
@@ -224,16 +221,16 @@ public class MSubmission extends MAccountableEntity {
     return fromSchema;
   }
 
-  public void setFromSchema(Schema connectorSchema) {
-    this.fromSchema = connectorSchema;
+  public void setFromSchema(Schema fromSchema) {
+    this.fromSchema = fromSchema;
   }
 
   public Schema getToSchema() {
     return toSchema;
   }
 
-  public void setToSchema(Schema hioSchema) {
-    this.toSchema = hioSchema;
+  public void setToSchema(Schema toSchema) {
+    this.toSchema = toSchema;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/schema/Schema.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java
index bbebab8..40c362c 100644
--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
@@ -77,7 +77,7 @@ public class Schema {
    * same name will lead to an exception being thrown.
    *
    * @param column Column that should be added to the schema at the end.
-   * @return
+   * @return a reference to this object
    */
   public Schema addColumn(Column column) {
     if(column.getName() == null) {
@@ -121,6 +121,15 @@ public class Schema {
     return columns;
   }
 
+  public boolean isEmpty() {
+    if (columns.size()==0) {
+      return true;
+    } else {
+      return false;
+    }
+
+  }
+
   public String toString() {
     return new StringBuilder("Schema{")
       .append("name=").append(name).append("")

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
index 7c8c61e..d430a64 100644
--- a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
+++ b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
@@ -30,6 +30,12 @@ public enum SchemaError implements ErrorCode {
 
   SCHEMA_0002("Duplicate column name"),
 
+  SCHEMA_0003("Source and Target schemas don't match"),
+
+  SCHEMA_0004("Non-null target column has no matching source column"),
+
+  SCHEMA_0005("No matching method available for source and target schemas")
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java b/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java
new file mode 100644
index 0000000..e3ab026
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.schema;
+
+/**
+ * The order of the matching options here indicates an order of preference
+ * if it is possible to use both NAME and LOCATION matching options, we will prefer NAME
+ *
+ * NAME - match columns in FROM and TO schemas by column name. Data from column "hello"
+ * will be written to a column named "hello". If TO schema doesn't have a column with
+ * identical name, the column will be skipped.
+ *
+ * LOCATION - match columns in FROM and TO schemas by the column location.
+ * Data from first column goes into first column in TO link.
+ * If FROM link has more columns than TO, the extra columns will be skipped.
+ *
+ * USER_DEFINED - not implemented yet.
+ */
+public enum SchemaMatchOption {
+    NAME,
+    LOCATION,
+  //TODO: SQOOP-1546 - SQOOP2: Allow users to define their own schema matching
+    USER_DEFINED
+  }
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/main/java/org/apache/sqoop/schema/type/Column.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
index 30c26a3..97bd303 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
@@ -32,11 +32,15 @@ public abstract class Column {
    */
   Boolean nullable;
 
+  /**
+   * By default columns are empty name and are nullable
+   */
   public Column() {
+    this("", true);
   }
 
   public Column(String name) {
-    setName(name);
+    this(name, true);
   }
 
   public Column(String name, Boolean nullable) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
index ab5bbd4..b652b32 100644
--- a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
+++ b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java
@@ -172,6 +172,6 @@ public class TestSchemaSerialization {
     String transferredString = extractJson.toJSONString();
 
     JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString);
-    return SchemaSerialization.restoreSchemna(restoreJson);
+    return SchemaSerialization.restoreSchema(restoreJson);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index 776359a..5f091de 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -81,7 +81,7 @@ public class TestExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer);
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@@ -115,7 +115,7 @@ public class TestExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer);
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index 70833a0..cd5350e 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -19,6 +19,7 @@
 package org.apache.sqoop.connector.hdfs;
 
 import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
@@ -87,7 +88,9 @@ public class HdfsConnector extends SqoopConnector {
       case TO:
         return ToJobConfiguration.class;
       default:
-        return null;
+        throw new SqoopException(
+                HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0006,
+                String.valueOf(jobType));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
index 8a095d2..71f0a03 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
@@ -31,7 +31,8 @@ public enum HdfsConnectorError implements ErrorCode{
   /** The system was unable to instantiate the specified class. */
   GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
   /** Error occurs during loader run */
-  GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run")
+  GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"),
+  GENERIC_HDFS_CONNECTOR_0006("Unknown job type")
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
index 923f904..c2dc1a5 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
@@ -39,6 +39,6 @@ public class HdfsInitializer extends Initializer {
 
   @Override
   public Schema getSchema(InitializerContext context, Object linkConf, Object jobConf) {
-    return null;
+    return new Schema("HDFS file");
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 7942d59..c6d2f90 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -129,8 +129,10 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    }, null);
+    });
+
     LinkConfiguration connConf = new LinkConfiguration();
+
     FromJobConfiguration jobConf = new FromJobConfiguration();
 
     HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index df5cb9c..2a49221 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -22,7 +22,12 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.matcher.AbstractMatcher;
+import org.apache.sqoop.connector.idf.matcher.LocationMatcher;
+import org.apache.sqoop.connector.idf.matcher.NameMatcher;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.SchemaMatchOption;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
@@ -36,6 +41,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Matcher;
 
 public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@@ -65,7 +71,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
   private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
 
-  private Schema schema;
+  private Schema fromSchema;
+  private Schema toSchema;
 
   /**
    * {@inheritDoc}
@@ -87,19 +94,11 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    * {@inheritDoc}
    */
   @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void setSchema(Schema schema) {
+  public void setFromSchema(Schema schema) {
     if(schema == null) {
       return;
     }
-    this.schema = schema;
+    this.fromSchema = schema;
     List<Column> columns = schema.getColumns();
     int i = 0;
     for(Column col : columns) {
@@ -113,6 +112,19 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setToSchema(Schema schema) {
+    if(schema == null) {
+      return;
+    }
+    this.toSchema = schema;
+  }
+
+
+
+  /**
    * Custom CSV parser that honors quoting and escaped quotes.
    * All other escaping is handled elsewhere.
    *
@@ -168,6 +180,19 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   /**
    * {@inheritDoc}
+   *
+   * The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema.
+   * We currently have 3 methods of matching fields in one schema to another:
+   * - by location
+   * - by name
+   * - user-defined matching
+   *
+   * If one schema exists (either to or from) and the other is empty
+   * We'll match fields based on location.
+   * If both schemas exist, we'll match names of fields.
+   *
+   * In the future, we may want to let users choose the method
+   * Currently nothing is implemented for user-defined matching
    */
   @Override
   public Object[] getObjectData() {
@@ -176,51 +201,53 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       return null;
     }
 
-    if (schema == null) {
+    if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) {
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
     }
 
-    if (fields.length != schema.getColumns().size()) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
-        "The data " + getTextData() + " has the wrong number of fields.");
-    }
+    AbstractMatcher matcher = getMatcher(fromSchema,toSchema);
+    String[] outFields =  matcher.getMatchingData(fields, fromSchema, toSchema);
+    Object[] out =  new Object[outFields.length];
+
+    int i = 0;
 
-    Object[] out = new Object[fields.length];
-    Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
-    for (int i = 0; i < fields.length; i++) {
-      Type colType = cols[i].getType();
-      //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method
-      if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) {
+    // After getting back the data in order that matches the output schema
+    // We need to un-do the CSV escaping
+    for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) {
+      Type colType = col.getType();
+      if (outFields[i] == null) {
         out[i] = null;
         continue;
       }
       if (colType == Type.TEXT) {
-        out[i] = unescapeStrings(fields[i]);
+        out[i] = unescapeStrings(outFields[i]);
       } else if (colType == Type.BINARY) {
-        out[i] = unescapeByteArray(fields[i]);
+        out[i] = unescapeByteArray(outFields[i]);
       } else if (colType == Type.FIXED_POINT) {
-        Long byteSize = ((FixedPoint) cols[i]).getByteSize();
+        Long byteSize = ((FixedPoint) col).getByteSize();
         if (byteSize != null && byteSize <= Integer.SIZE) {
-          out[i] = Integer.valueOf(fields[i]);
+          out[i] = Integer.valueOf(outFields[i]);
         } else {
-          out[i] = Long.valueOf(fields[i]);
+          out[i] = Long.valueOf(outFields[i]);
         }
       } else if (colType == Type.FLOATING_POINT) {
-        Long byteSize = ((FloatingPoint) cols[i]).getByteSize();
+        Long byteSize = ((FloatingPoint) col).getByteSize();
         if (byteSize != null && byteSize <= Float.SIZE) {
-          out[i] = Float.valueOf(fields[i]);
+          out[i] = Float.valueOf(outFields[i]);
         } else {
-          out[i] = Double.valueOf(fields[i]);
+          out[i] = Double.valueOf(outFields[i]);
         }
       } else if (colType == Type.DECIMAL) {
-        out[i] = new BigDecimal(fields[i]);
+        out[i] = new BigDecimal(outFields[i]);
       } else {
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
       }
+      i++;
     }
     return out;
   }
 
+
   /**
    * {@inheritDoc}
    */
@@ -353,4 +380,15 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   public String toString() {
     return data;
   }
+
+  private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) {
+    if (toSchema.isEmpty() || fromSchema.isEmpty()) {
+      return new LocationMatcher();
+    } else {
+      return new NameMatcher();
+    }
+
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 74b9518..d98b779 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -81,7 +81,8 @@ public abstract class IntermediateDataFormat<T> {
   /**
    * Get one row of data as CSV.
    *
-   * @return - String representing the data in CSV
+   * @return - String representing the data in CSV, according to the "FROM" schema.
+   * No schema conversion is done on textData, to keep it as "high performance" option.
    */
   public abstract String getTextData();
 
@@ -95,6 +96,7 @@ public abstract class IntermediateDataFormat<T> {
    * Get one row of data as an Object array.
    *
    * @return - String representing the data as an Object array
+   * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
    */
   public abstract Object[] getObjectData();
 
@@ -105,18 +107,18 @@ public abstract class IntermediateDataFormat<T> {
   public abstract void setObjectData(Object[] data);
 
   /**
-   * Set the schema to be used.
+   * Set the schema for reading data.
    *
-   * @param schema - the schema to be used
+   * @param schema - the schema used for reading data
    */
-  public abstract void setSchema(Schema schema);
+  public abstract void setFromSchema(Schema schema);
 
   /**
-   * Get the schema of the data.
+   * Set the schema for writing data.
    *
-   * @return - The schema of the data.
+   * @param schema - the schema used for writing data
    */
-  public abstract Schema getSchema();
+  public abstract void setToSchema(Schema schema);
 
   /**
    * Serialize the fields of this object to <code>out</code>.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
new file mode 100644
index 0000000..e6b2316
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.idf.matcher;
+
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+
+public abstract class AbstractMatcher {
+
+  //NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats
+  //NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS.
+
+  /**
+   *
+   * @param fields
+   * @param fromSchema
+   * @param toSchema
+   * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
+   * Right not "converted" means re-ordering if needed and handling nulls.
+   */
+  abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema);
+
+  /***
+   *
+   * @param fromSchema
+   * @param toSchema
+   * @return return a schema with which to read the output data
+   * This always returns the toSchema (since this is used when getting output data), unless its empty
+   */
+  public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) {
+    if (toSchema.isEmpty()) {
+      return fromSchema;
+    } else {
+      return toSchema;
+    }
+
+  }
+
+  protected boolean isNull(String value) {
+    if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
new file mode 100644
index 0000000..938a5df
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.idf.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.SchemaMatchOption;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Type;
+
+import java.math.BigDecimal;
+import java.util.Iterator;
+
+
+/**
+ * Convert data according to FROM schema to data according to TO schema
+ * This is done based on column location
+ * So data in first column in FROM goes into first column in TO, etc
+ * If TO schema has more fields and they are "nullable", the value will be set to null
+ * If TO schema has extra non-null fields, we'll throw an exception
+ */
+public class LocationMatcher extends AbstractMatcher {
+
+  public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
+  @Override
+  public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
+
+    String[] out = new String[toSchema.getColumns().size()];
+
+    int i = 0;
+
+    if (toSchema.isEmpty()) {
+      // If there's no destination schema, no need to convert anything
+      // Just use the original data
+      return fields;
+    }
+
+    for (Column col: toSchema.getColumns())
+    {
+      if (i < fields.length) {
+        if (isNull(fields[i])) {
+          out[i] = null;
+        } else {
+          out[i] = fields[i];
+        }
+      }
+      // We ran out of fields before we ran out of schema
+      else {
+        if (!col.getNullable()) {
+          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
+        } else {
+          LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
+          out[i] = null;
+        }
+      }
+      i++;
+    }
+    return out;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
new file mode 100644
index 0000000..417c85b
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.idf.matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaError;
+import org.apache.sqoop.schema.type.Column;
+
+import java.util.HashMap;
+
+public class NameMatcher extends AbstractMatcher {
+  public static final Logger LOG = Logger.getLogger(NameMatcher.class);
+
+  @Override
+  public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
+    String[] out = new String[toSchema.getColumns().size()];
+
+    HashMap<String,Column> colNames = new HashMap<String, Column>();
+
+    for (Column fromCol: fromSchema.getColumns()) {
+      colNames.put(fromCol.getName(), fromCol);
+    }
+
+    int toIndex = 0;
+
+    for (Column toCol: toSchema.getColumns()) {
+      Column fromCol = colNames.get(toCol.getName());
+
+      if (fromCol != null) {
+        int fromIndex = fromSchema.getColumns().indexOf(fromCol);
+        if (isNull(fields[fromIndex])) {
+          out[toIndex] = null;
+        } else {
+          out[toIndex] = fields[fromIndex];
+        }
+      } else {
+        //column exists in TO schema but not in FROM schema
+        if (toCol.getNullable() == false) {
+          throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
+        } else {
+          LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
+          out[toIndex] = null;
+        }
+      }
+
+      toIndex++;
+    }
+
+  return out;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 8c83a71..3954039 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.idf;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.SchemaMatchOption;
 import org.apache.sqoop.schema.type.Binary;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.Text;
@@ -39,6 +40,8 @@ public class TestCSVIntermediateDataFormat {
 
   private IntermediateDataFormat<?> data;
 
+  private Schema emptySchema = new Schema("empty");
+
   @Before
   public void setUp() {
     data = new CSVIntermediateDataFormat();
@@ -70,7 +73,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    data.setFromSchema(schema);
     data.setTextData(null);
 
     Object[] out = data.getObjectData();
@@ -87,7 +90,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    data.setFromSchema(schema);
     data.setTextData("");
 
     data.getObjectData();
@@ -106,7 +109,9 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+
+    data.setFromSchema(schema);
+    data.setToSchema(emptySchema);
     data.setTextData(testData);
 
     Object[] out = data.getObjectData();
@@ -129,7 +134,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    data.setFromSchema(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[6];
@@ -159,7 +164,8 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    data.setFromSchema(schema);
+    data.setToSchema(emptySchema);
 
     Object[] in = new Object[6];
     in[0] = new Long(10);
@@ -181,7 +187,9 @@ public class TestCSVIntermediateDataFormat {
   public void testStringFullRangeOfCharacters() {
     Schema schema = new Schema("test");
     schema.addColumn(new Text("1"));
-    data.setSchema(schema);
+
+    data.setFromSchema(schema);
+    data.setToSchema(emptySchema);
 
     char[] allCharArr = new char[256];
     for(int i = 0; i < allCharArr.length; ++i) {
@@ -204,7 +212,8 @@ public class TestCSVIntermediateDataFormat {
   public void testByteArrayFullRangeOfCharacters() {
     Schema schema = new Schema("test");
     schema.addColumn(new Binary("1"));
-    data.setSchema(schema);
+    data.setFromSchema(schema);
+    data.setToSchema(emptySchema);
 
     byte[] allCharByteArr = new byte[256];
     for(int i = 0; i < allCharByteArr.length; ++i) {
@@ -219,4 +228,132 @@ public class TestCSVIntermediateDataFormat {
     data.setObjectData(in);
     assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
   }
+
+  /**
+   * Note that we don't have an EmptyTo matching test
+   * Because most tests above have empty "to" schema
+   */
+  @Test
+  public void testMatchingEmptyFrom() {
+
+    data.setFromSchema(emptySchema);
+
+    Schema toSchema = new Schema("To");
+    toSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"));
+    data.setToSchema(toSchema);
+
+    Object[] in = new Object[2];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+
+    Object[] out = new Object[2];
+    out[0] = new Long(10);
+    out[1] = new Long(34);
+
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+  }
+
+  @Test(expected=SqoopException.class)
+  public void testMatchingTwoEmptySchema() {
+    data.setFromSchema(emptySchema);
+    data.setToSchema(emptySchema);
+
+    Object[] in = new Object[2];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+
+    data.setObjectData(in);
+
+    data.getObjectData();
+  }
+
+  @Test
+  public void testMatchingFewerFromColumns(){
+    Schema fromSchema = new Schema("From");
+    fromSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"));
+    data.setFromSchema(fromSchema);
+
+    Schema toSchema = new Schema("To");
+    toSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"))
+            .addColumn(new Text("3"));
+    data.setToSchema(toSchema);
+
+    Object[] in = new Object[2];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+
+    Object[] out = new Object[3];
+    out[0] = new Long(10);
+    out[1] = new Long(34);
+    out[2] = null;
+
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+  }
+
+  @Test
+  public void testMatchingFewerToColumns(){
+    Schema fromSchema = new Schema("From");
+    fromSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"))
+            .addColumn(new FixedPoint("3"));
+    data.setFromSchema(fromSchema);
+
+    Schema toSchema = new Schema("To");
+    toSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"));
+    data.setToSchema(toSchema);
+
+    Object[] in = new Object[3];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = new Long(50);
+
+    Object[] out = new Object[2];
+    out[0] = new Long(10);
+    out[1] = new Long(34);
+
+
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+  }
+
+
+  @Test
+  public void testWithSomeNonMatchingFields(){
+
+    Schema fromSchema = new Schema("From");
+    fromSchema.addColumn(new FixedPoint("1"))
+            .addColumn(new FixedPoint("2"))
+            .addColumn(new FixedPoint("3"));
+    data.setFromSchema(fromSchema);
+
+    Schema toSchema = new Schema("From");
+    toSchema.addColumn(new FixedPoint("2"))
+            .addColumn(new FixedPoint("3"))
+            .addColumn(new FixedPoint("4"));
+    data.setToSchema(toSchema);
+
+    Object[] in = new Object[3];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = new Long(50);
+
+    Object[] out = new Object[3];
+    out[0] = new Long(34);
+    out[1] = new Long(50);
+    out[2] = null;
+
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(out, data.getObjectData()));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 277c6be..e91c436 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -374,13 +374,10 @@ public class JobManager implements Reconfigurable {
     Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
     Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
 
-    // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
-    if (fromSchema != null) {
-      jobRequest.getSummary().setFromSchema(fromSchema);
-    }
-    else {
-      jobRequest.getSummary().setFromSchema(toSchema);
-    }
+
+    jobRequest.getSummary().setFromSchema(fromSchema);
+    jobRequest.getSummary().setToSchema(toSchema);
+
     LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
     return jobRequest;
   }
@@ -458,7 +455,7 @@ public class JobManager implements Reconfigurable {
     initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getConnectorJobConfig(direction));
 
-    // TODO(Abe): Alter behavior of Schema here.
+
     return initializer.getSchema(initializerContext,
         jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getConnectorJobConfig(direction));
@@ -709,4 +706,4 @@ public class JobManager implements Reconfigurable {
       LOG.info("Ending submission manager update thread");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/docs/src/site/sphinx/Tools.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/Tools.rst b/docs/src/site/sphinx/Tools.rst
index 84cbd5f..6d36b27 100644
--- a/docs/src/site/sphinx/Tools.rst
+++ b/docs/src/site/sphinx/Tools.rst
@@ -23,8 +23,7 @@ Tools are server commands that administrators can execute on the Sqoop server ma
 In order to perform the maintenance task each tool is suppose to do, they need to be executed in exactly the same environment as the main Sqoop server. The tool binary will take care of setting up the ``CLASSPATH`` and other environmental variables that might be required. However it's up to the administrator himself to run the tool under the same user as is used for the server. This is usually configured automatically for various Hadoop distributions (such as Apache Bigtop).
 
 
-.. note:: Running tools under a different user such as ``root`` might prevent Sqoop Server from running correctly.
-
+.. note:: Running tools while the Sqoop Server is also running is not recommended as it might lead to a data corruption and service disruption.
 List of available tools:
 
 * verify

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 2ed06a8..b533837 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -82,13 +82,13 @@ public final class ConfigurationUtils {
 
   private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
 
-  private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
+  private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
 
-  private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR);
+  private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
 
-  private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
+  private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
 
-  private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
+  private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
 
 
   /**
@@ -163,6 +163,27 @@ public final class ConfigurationUtils {
   }
 
   /**
+   * Persist Connector generated schema.
+   *
+   * @param type  Direction of schema we are persisting
+   * @param job MapReduce Job object
+   * @param schema Schema
+   */
+  public static void setConnectorSchema(Direction type, Job job, Schema schema) {
+    if(schema != null) {
+      String jsonSchema =  SchemaSerialization.extractSchema(schema).toJSONString();
+      switch (type) {
+        case FROM:
+          job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
+          return;
+        case TO:
+          job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
+          return;
+      }
+    }
+  }
+
+  /**
    * Retrieve Connector configuration object for connection.
    *
    * @param configuration MapReduce configuration object
@@ -226,23 +247,7 @@ public final class ConfigurationUtils {
     return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
   }
 
-  /**
-   * Persist From Connector generated schema.
-   *
-   * @param job MapReduce Job object
-   * @param schema Schema
-   */
-  public static void setConnectorSchema(Direction type, Job job, Schema schema) {
-    if(schema != null) {
-      switch (type) {
-        case FROM:
-          job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
 
-        case TO:
-          job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
-      }
-    }
-  }
 
   /**
    * Retrieve Connector generated schema.
@@ -253,10 +258,10 @@ public final class ConfigurationUtils {
   public static Schema getConnectorSchema(Direction type, Configuration configuration) {
     switch (type) {
       case FROM:
-        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY));
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
 
       case TO:
-        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY));
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
     }
 
     return null;
@@ -274,7 +279,9 @@ public final class ConfigurationUtils {
     if(bytes == null) {
       return null;
     }
-    return SchemaSerialization.restoreSchemna((JSONObject) JSONValue.parse(new String(bytes)));
+
+    JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
+    return SchemaSerialization.restoreSchema(jsonSchema);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 6680f60..8c88d52 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -64,20 +64,18 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
-    // TODO(Abe/Gwen): Change to conditional choosing between Connector schemas.
-    Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
-    if (schema == null) {
-      schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
-    }
 
-    if (schema == null) {
-      LOG.info("setting an empty schema");
-    }
+
+    Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+    Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
 
     String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
     dataFormat = (IntermediateDataFormat<String>) ClassUtils
         .instantiate(intermediateDataFormatName);
-    dataFormat.setSchema(schema);
+
+    dataFormat.setFromSchema(fromSchema);
+    dataFormat.setToSchema(toSchema);
+
     dataOut = new SqoopWritable();
 
     // Objects that should be passed to the Executor execution
@@ -86,7 +84,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
-    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema);
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context));
 
     try {
       LOG.info("Starting progress service");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index eea0623..941b31d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -73,12 +73,11 @@ public class SqoopOutputFormatLoadExecutor {
     dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
       .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
 
-    Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
-    if (schema==null) {
-      schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
-    }
+    Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
+    dataFormat.setFromSchema(fromSchema);
 
-    dataFormat.setSchema(schema);
+    Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
+    dataFormat.setToSchema(toSchema);
   }
 
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
@@ -231,10 +230,8 @@ public class SqoopOutputFormatLoadExecutor {
         Schema schema = null;
 
         if (!isTest) {
-          // Propagate connector schema in every case for now
-          // TODO: Change to coditional choosing between Connector schemas.
-          // @TODO(Abe): Maybe use TO schema?
-          schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+          // Using the TO schema since the IDF returns data in TO schema
+          schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
 
           subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
           configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
index 60acfb6..0e2a38d 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
@@ -68,12 +68,12 @@ public final class SubmissionDisplayer {
       }
     }
 
-    if(isVerbose() && submission.getFromSchema() != null) {
+    if(isVerbose() && submission.getFromSchema() != null && !submission.getFromSchema().isEmpty() ) {
       print(resourceString(Constants.RES_FROM_SCHEMA)+": ");
       println(submission.getFromSchema());
     }
 
-    if(isVerbose() && submission.getToSchema() != null) {
+    if(isVerbose() && submission.getToSchema() != null && !submission.getToSchema().isEmpty() ) {
       print(resourceString(Constants.RES_TO_SCHEMA)+": ");
       println(submission.getToSchema());
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/shell/src/main/resources/shell-resource.properties
----------------------------------------------------------------------
diff --git a/shell/src/main/resources/shell-resource.properties b/shell/src/main/resources/shell-resource.properties
index b59bd81..c0f86f7 100644
--- a/shell/src/main/resources/shell-resource.properties
+++ b/shell/src/main/resources/shell-resource.properties
@@ -227,5 +227,6 @@ submission.progress_not_available = Progress is not available
 submission.counters = Counters
 submission.executed_success = Job executed successfully
 submission.server_url = Server URL
-submission.from_schema = From schema
-submission.to_schema = To schema
\ No newline at end of file
+submission.from_schema = Source Connector schema
+submission.to_schema = Target Connector schema
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c20d920/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index fe92ac4..25255ae 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -206,11 +206,13 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getConnectorJobConfig(Direction.FROM));
       ConfigurationUtils.setConnectorConnectionConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
       ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getConnectorJobConfig(Direction.TO));
+
       ConfigurationUtils.setFrameworkConnectionConfig(Direction.FROM, job, request.getFrameworkLinkConfig(Direction.FROM));
       ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkLinkConfig(Direction.TO));
       ConfigurationUtils.setFrameworkJobConfig(job, request.getFrameworkJobConfig());
-      // @TODO(Abe): Persist TO schema.
+
       ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
+      ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());
@@ -413,4 +415,5 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     return "local".equals(globalConfiguration.get("mapreduce.jobtracker.address"))
         || "local".equals(globalConfiguration.get("mapred.job.tracker"));
   }
-}
\ No newline at end of file
+
+}